From 4ec23b60095186805e719eff2af6f38a4f837bd4 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Wed, 20 Feb 2019 16:15:17 +0100 Subject: [PATCH 01/39] Initial commit --- ha.go | 428 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 428 insertions(+) create mode 100644 ha.go diff --git a/ha.go b/ha.go new file mode 100644 index 00000000..9c3eb512 --- /dev/null +++ b/ha.go @@ -0,0 +1,428 @@ +package icingadb_ha_lib + +import ( + "database/sql" + "git.icinga.com/icingadb-connection" + "github.com/go-redis/redis" + "github.com/google/uuid" + "sync/atomic" + "time" + log "github.com/sirupsen/logrus" +) + +// responsibility tells whether we're responsible for our environment. +type responsibility uint8 + +// readyForTakeover says that we aren't responsible, but we could take over. +const readyForTakeover responsibility = 0 + +// TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet. +const TakeoverNoSync responsibility = 1 + +// TakeoverSync says that we've taken over and are actually syncing config. +const TakeoverSync responsibility = 2 + +// stop says that we've taken over and are actually syncing config, but we're going to stop it. +const stop responsibility = 3 + +// notReadyForTakeover says that we aren't responsible and can't take over. +const notReadyForTakeover responsibility = 4 + +// responsibilityAction tells the next action on the database. +type responsibilityAction uint8 + +// noAction says that we won't do anything. +const noAction responsibilityAction = 0 + +// tryTakeover says that we're going to try to take over. +const tryTakeover responsibilityAction = 1 + +// doTakeover says that we're going to take over. +const doTakeover responsibilityAction = 2 + +// ceaseOperation says that we're going to release our responsibility. +const ceaseOperation responsibilityAction = 3 + +type HA struct { + ourUUID uuid.UUID + icinga2MTime int64 + // responsibility tells whether we're responsible for our environment. + responsibility uint32 + // responsibleSince tells since when we're responsible for our environment. + responsibleSince time.Time + // runningCriticalOperations counts the currently running critical operations. + runningCriticalOperations uint64 + // lastCriticalOperationEnd tells when the last critical operation finished. + lastCriticalOperationEnd int64 +} + +// RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible. +func (h *HA) RunCriticalOperation(op func() error) error { + switch h.getResponsibility() { + case TakeoverSync, stop: + atomic.AddUint64(&h.runningCriticalOperations, 1) + + err := op() + + atomic.StoreInt64(&h.lastCriticalOperationEnd, time.Now().Unix()) + atomic.AddUint64(&h.runningCriticalOperations, ^uint64(0)) + + return err + } + + return nil +} + +func (h *HA) Icinga2HeartBeat() { + atomic.StoreInt64(&h.icinga2MTime, time.Now().Unix()) +} + +func (h *HA) IsResponsible() bool { + return h.getResponsibility() == TakeoverSync +} + +func (h *HA) Run(rdb *redis.Client, dbw *connection.DBWrapper, chEnv chan *connection.Environment, chErr chan error) { + go cleanUpInstancesAsync(dbw, chErr) + + if errRun := h.run(rdb, dbw, chEnv); errRun != nil { + chErr <- errRun + return + } +} + +// cleanUpInstancesAsync cleans up icingadb_instance periodically. +func cleanUpInstancesAsync(dbw *connection.DBWrapper, chErr chan error) { + if errCI := cleanUpInstances(dbw); errCI != nil { + chErr <- errCI + } +} + +// cleanUpInstances cleans up icingadb_instance periodically. +func cleanUpInstances(dbw *connection.DBWrapper) error { + every5m := time.NewTicker(5 * time.Minute) + defer every5m.Stop() + + for { + <-every5m.C + + log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") + + errTx := dbw.SqlTransaction(true, true, func(tx *sql.Tx) error { + _, errExec := dbw.SqlExec( + tx, + "delete from icingadb_instance by heartbeat", + `DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`, + time.Now().Unix(), + ) + + return errExec + }) + if errTx != nil { + return errTx + } + } +} + +func (h *HA) run(rdb *redis.Client, dbw *connection.DBWrapper, chEnv chan *connection.Environment) error { + log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") + + var env *connection.Environment = nil + var hasEnv bool + + env, hasEnv = <-chEnv + if !hasEnv { + return nil + } + + var errNR error + if h.ourUUID, errNR = uuid.NewRandom(); errNR != nil { + return errNR + } + + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": env.Name, + }).Info("Received environment from Icinga 2") + + everySecond := time.NewTicker(time.Second) + defer everySecond.Stop() + + var nextAction responsibilityAction + var theirUUID uuid.UUID + + // Even if Icinga 2 is offline now, Redis may be filled + h.Icinga2HeartBeat() + + for { + switch h.getResponsibility() { + case readyForTakeover: + if !h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": env.Name, + }).Warn("Icinga 2 detected as not running, stopping.") + + h.setResponsibility(notReadyForTakeover) + continue + } + + nextAction = tryTakeover + case TakeoverNoSync: + if !h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": env.Name, + }).Warn("Icinga 2 detected as not running, stopping.") + + h.setResponsibility(stop) + continue + } + + nextAction = tryTakeover + case TakeoverSync: + if !h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": env.Name, + }).Warn("Icinga 2 detected as not running, stopping.") + + h.setResponsibility(stop) + continue + } + + nextAction = doTakeover + case stop: + if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { + nextAction = ceaseOperation + } else { + nextAction = doTakeover + } + case notReadyForTakeover: + if h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": env.Name, + }).Info("Icinga 2 detected as running again.") + + h.setResponsibility(readyForTakeover) + continue + } + + nextAction = noAction + } + + switch nextAction { + case noAction: + break + case tryTakeover, doTakeover: + var justTakenOver bool + + errTx := dbw.SqlTransactionQuiet(true, true, func(tx *sql.Tx) error { + { + rows, errFA := dbw.SqlFetchAllQuiet( + tx, + "select from icingadb_instance by id", + `SELECT 1 FROM icingadb_instance WHERE id = ?`, + h.ourUUID[:], + ) + if errFA != nil { + return errFA + } + + if len(rows) > 0 { + _, errExec := dbw.SqlExecQuiet( + tx, + "update icingadb_instance by id", + `UPDATE icingadb_instance SET environment_id=?, heartbeat=? WHERE id = ?`, + env.ID, + time.Now().Unix(), + h.ourUUID[:], + ) + if errExec != nil { + return errExec + } + } else { + _, errExec := dbw.SqlExecQuiet( + tx, + "insert into icingadb_instance", + `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, + h.ourUUID[:], + env.ID, + time.Now().Unix(), + "n", + ) + if errExec != nil { + return errExec + } + } + } + + justTakenOver = false + + rows, errFA := dbw.SqlFetchAllQuiet( + tx, + "select from icingadb_instance by environment_id, responsible", + `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ?`, + env.ID, + "y", + ) + if errFA != nil { + return errFA + } + + if len(rows) > 0 { + copy(theirUUID[:], rows[0][0].([]byte)) + + if theirUUID == h.ourUUID { + justTakenOver = true + } else if time.Now().Unix()-rows[0][1].(int64) >= 10 { + { + _, errExec := dbw.SqlExecQuiet( + tx, + "update icingadb_instance by environment_id", + `UPDATE icingadb_instance SET responsible=? WHERE environment_id = ?`, + "n", + env.ID, + ) + if errExec != nil { + return errExec + } + } + + _, errExec := dbw.SqlExecQuiet( + tx, + "update icingadb_instance by id", + `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, + "y", + h.ourUUID[:], + ) + if errExec != nil { + return errExec + } + + justTakenOver = true + } + } else { + _, errExec := dbw.SqlExecQuiet( + tx, + "update icingadb_instance by id", + `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, + "y", + h.ourUUID[:], + ) + if errExec != nil { + return errExec + } + + justTakenOver = true + } + + return nil + }) + if errTx != nil { + return errTx + } + + if justTakenOver && h.getResponsibility() != stop { + if h.responsibleSince == (time.Time{}) { + h.responsibleSince = time.Now() + h.setResponsibility(TakeoverNoSync) + } else { + responsibleFor := time.Now().Sub(h.responsibleSince).Seconds() + + if responsibleFor >= 5.0 { + if h.setResponsibility(TakeoverSync) == TakeoverNoSync { + log.WithFields(log.Fields{ + "context": "HA", + "env": env.Name, + "their_uuid": theirUUID.String(), + }).Info("Taking over") + } + + if _, errRP := rdb.Publish("icingadb:wakeup", h.ourUUID.String()).Result(); errRP != nil { + return errRP + } + } + } + } + + if !justTakenOver { + log.WithFields(log.Fields{ + "context": "HA", + "env": env.Name, + "their_uuid": theirUUID.String(), + }).Info("Other instance is responsible") + } + case ceaseOperation: + errTx := dbw.SqlTransactionQuiet(true, true, func(tx *sql.Tx) error { + rows, errFA := dbw.SqlFetchAllQuiet( + tx, + "select from icingadb_instance by environment_id, responsible, heartbeat", + `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND ? - heartbeat < 10`, + env.ID, + "n", + time.Now().Unix(), + ) + if errFA != nil { + return errFA + } + + if len(rows) > 0 { + _, errExec := dbw.SqlExecQuiet( + tx, + "update icingadb_instance", + `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, + "n", + h.ourUUID[:], + ) + + return errExec + } + + return nil + }) + if errTx != nil { + return errTx + } + + log.WithFields(log.Fields{ + "context": "HA", + "env": env.Name, + }).Info("Other instance is responsible. Ceasing operations.") + + h.responsibleSince = time.Time{} + h.setResponsibility(notReadyForTakeover) + } + + select { + case env, hasEnv = <-chEnv: + if !hasEnv { + return nil + } + + <-everySecond.C + case <-everySecond.C: + break + } + } +} + +// icinga2IsAlive returns whether Icinga 2 seems to be running. +func (h *HA) icinga2IsAlive() bool { + return time.Now().Unix()-atomic.LoadInt64(&h.icinga2MTime) < 15 +} + +// getResponsibility gets the responsibility. +func (h *HA) getResponsibility() responsibility { + return responsibility(atomic.LoadUint32(&h.responsibility)) +} + +// setResponsibility sets the responsibility and returns the previous one. +func (h *HA) setResponsibility(r responsibility) responsibility { + return responsibility(atomic.SwapUint32(&h.responsibility, uint32(r))) +} From 2281f3ec5290aa788e6bdd24edab77ac5e6a53a7 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Wed, 20 Feb 2019 16:31:52 +0100 Subject: [PATCH 02/39] Update include --- ha.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ha.go b/ha.go index 9c3eb512..4a8e41c0 100644 --- a/ha.go +++ b/ha.go @@ -81,7 +81,7 @@ func (h *HA) IsResponsible() bool { return h.getResponsibility() == TakeoverSync } -func (h *HA) Run(rdb *redis.Client, dbw *connection.DBWrapper, chEnv chan *connection.Environment, chErr chan error) { +func (h *HA) Run(rdb *redis.Client, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { go cleanUpInstancesAsync(dbw, chErr) if errRun := h.run(rdb, dbw, chEnv); errRun != nil { @@ -91,14 +91,14 @@ func (h *HA) Run(rdb *redis.Client, dbw *connection.DBWrapper, chEnv chan *conne } // cleanUpInstancesAsync cleans up icingadb_instance periodically. -func cleanUpInstancesAsync(dbw *connection.DBWrapper, chErr chan error) { +func cleanUpInstancesAsync(dbw *icingadb_connection.DBWrapper, chErr chan error) { if errCI := cleanUpInstances(dbw); errCI != nil { chErr <- errCI } } // cleanUpInstances cleans up icingadb_instance periodically. -func cleanUpInstances(dbw *connection.DBWrapper) error { +func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { every5m := time.NewTicker(5 * time.Minute) defer every5m.Stop() @@ -123,10 +123,10 @@ func cleanUpInstances(dbw *connection.DBWrapper) error { } } -func (h *HA) run(rdb *redis.Client, dbw *connection.DBWrapper, chEnv chan *connection.Environment) error { +func (h *HA) run(rdb *redis.Client, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") - var env *connection.Environment = nil + var env *icingadb_connection.Environment = nil var hasEnv bool env, hasEnv = <-chEnv From be976dc22a31b9916309f423ba32ecbe713b78ff Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Thu, 21 Feb 2019 13:37:20 +0100 Subject: [PATCH 03/39] Add initial tests --- ha_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 ha_test.go diff --git a/ha_test.go b/ha_test.go new file mode 100644 index 00000000..73e55c43 --- /dev/null +++ b/ha_test.go @@ -0,0 +1,34 @@ +package icingadb_ha_lib + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestHA_setResponsibility(t *testing.T) { + responsibilities := [5]responsibility{ readyForTakeover, TakeoverNoSync, TakeoverSync, stop } + h := new(HA) + + previous := responsibility(0) + for _,r := range responsibilities { + assert.Equal(t, previous, h.setResponsibility(r), "Should be equal") + previous = r + } +} + +func TestHA_IsResponsible(t *testing.T) { + h := new(HA) + h.setResponsibility(TakeoverSync) + assert.True(t, h.IsResponsible(), "Should be responsible") + h.setResponsibility(TakeoverNoSync) + assert.False(t, h.IsResponsible(), "Should not be responsible") +} + +func TestHA_icinga2IsAlive(t *testing.T) { + h := new(HA) + h.icinga2MTime = time.Now().Unix() - 5 + assert.True(t, h.icinga2IsAlive(), "Should be alive") + h.icinga2MTime = h.icinga2MTime - 15 + assert.False(t, h.icinga2IsAlive(), "Should be dead") +} \ No newline at end of file From a77527fffd40a0cd3dfb21142700f7bcd7c13540 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Thu, 21 Feb 2019 14:23:45 +0100 Subject: [PATCH 04/39] Clean up function --- ha.go | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/ha.go b/ha.go index 4a8e41c0..6550f9f7 100644 --- a/ha.go +++ b/ha.go @@ -5,9 +5,9 @@ import ( "git.icinga.com/icingadb-connection" "github.com/go-redis/redis" "github.com/google/uuid" + log "github.com/sirupsen/logrus" "sync/atomic" "time" - log "github.com/sirupsen/logrus" ) // responsibility tells whether we're responsible for our environment. @@ -92,37 +92,36 @@ func (h *HA) Run(rdb *redis.Client, dbw *icingadb_connection.DBWrapper, chEnv ch // cleanUpInstancesAsync cleans up icingadb_instance periodically. func cleanUpInstancesAsync(dbw *icingadb_connection.DBWrapper, chErr chan error) { - if errCI := cleanUpInstances(dbw); errCI != nil { - chErr <- errCI - } -} - -// cleanUpInstances cleans up icingadb_instance periodically. -func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { every5m := time.NewTicker(5 * time.Minute) defer every5m.Stop() for { <-every5m.C - log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") - - errTx := dbw.SqlTransaction(true, true, func(tx *sql.Tx) error { - _, errExec := dbw.SqlExec( - tx, - "delete from icingadb_instance by heartbeat", - `DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`, - time.Now().Unix(), - ) - - return errExec - }) - if errTx != nil { - return errTx + if errCI := cleanUpInstances(dbw); errCI != nil { + chErr <- errCI } } } +// cleanUpInstances cleans up icingadb_instance periodically. +func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { + + log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") + + errTx := dbw.SqlTransaction(true, true, func(tx *sql.Tx) error { + _, errExec := dbw.SqlExec( + tx, + "delete from icingadb_instance by heartbeat", + `DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`, + time.Now().Unix(), + ) + + return errExec + }) + return errTx +} + func (h *HA) run(rdb *redis.Client, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") From 0e874501bdb30effb53a449d93f7a0caa0f3d348 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 11:14:54 +0100 Subject: [PATCH 05/39] Use icingadb_connection redis --- ha.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ha.go b/ha.go index 6550f9f7..cc64c1a4 100644 --- a/ha.go +++ b/ha.go @@ -3,7 +3,6 @@ package icingadb_ha_lib import ( "database/sql" "git.icinga.com/icingadb-connection" - "github.com/go-redis/redis" "github.com/google/uuid" log "github.com/sirupsen/logrus" "sync/atomic" @@ -81,7 +80,7 @@ func (h *HA) IsResponsible() bool { return h.getResponsibility() == TakeoverSync } -func (h *HA) Run(rdb *redis.Client, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { +func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { go cleanUpInstancesAsync(dbw, chErr) if errRun := h.run(rdb, dbw, chEnv); errRun != nil { @@ -122,7 +121,7 @@ func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { return errTx } -func (h *HA) run(rdb *redis.Client, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { +func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") var env *icingadb_connection.Environment = nil From 24df4c40312716b09fdf9f11e674be976b262ebb Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 12:07:06 +0100 Subject: [PATCH 06/39] Use enum for responsibility --- ha.go | 67 +++++++++++++++++++++++++++-------------------------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/ha.go b/ha.go index cc64c1a4..046c4f4e 100644 --- a/ha.go +++ b/ha.go @@ -9,23 +9,18 @@ import ( "time" ) -// responsibility tells whether we're responsible for our environment. -type responsibility uint8 - -// readyForTakeover says that we aren't responsible, but we could take over. -const readyForTakeover responsibility = 0 - -// TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet. -const TakeoverNoSync responsibility = 1 - -// TakeoverSync says that we've taken over and are actually syncing config. -const TakeoverSync responsibility = 2 - -// stop says that we've taken over and are actually syncing config, but we're going to stop it. -const stop responsibility = 3 - -// notReadyForTakeover says that we aren't responsible and can't take over. -const notReadyForTakeover responsibility = 4 +const ( + // readyForTakeover says that we aren't responsible, but we could take over. + resp_ReadyForTakeover = iota + // TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet. + resp_TakeoverNoSync = iota + // TakeoverSync says that we've taken over and are actually syncing config. + resp_TakeoverSync = iota + // stop says that we've taken over and are actually syncing config, but we're going to stop it. + resp_Stop = iota + // notReadyForTakeover says that we aren't responsible and can't take over. + resp_NotReadyForTakeover = iota +) // responsibilityAction tells the next action on the database. type responsibilityAction uint8 @@ -58,7 +53,7 @@ type HA struct { // RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible. func (h *HA) RunCriticalOperation(op func() error) error { switch h.getResponsibility() { - case TakeoverSync, stop: + case resp_TakeoverSync, resp_Stop: atomic.AddUint64(&h.runningCriticalOperations, 1) err := op() @@ -77,7 +72,7 @@ func (h *HA) Icinga2HeartBeat() { } func (h *HA) IsResponsible() bool { - return h.getResponsibility() == TakeoverSync + return h.getResponsibility() == resp_TakeoverSync } func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { @@ -154,7 +149,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D for { switch h.getResponsibility() { - case readyForTakeover: + case resp_ReadyForTakeover: if !h.icinga2IsAlive() { log.WithFields(log.Fields{ "context": "HA", @@ -162,12 +157,12 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": env.Name, }).Warn("Icinga 2 detected as not running, stopping.") - h.setResponsibility(notReadyForTakeover) + h.setResponsibility(resp_NotReadyForTakeover) continue } nextAction = tryTakeover - case TakeoverNoSync: + case resp_TakeoverNoSync: if !h.icinga2IsAlive() { log.WithFields(log.Fields{ "context": "HA", @@ -175,12 +170,12 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": env.Name, }).Warn("Icinga 2 detected as not running, stopping.") - h.setResponsibility(stop) + h.setResponsibility(resp_Stop) continue } nextAction = tryTakeover - case TakeoverSync: + case resp_TakeoverSync: if !h.icinga2IsAlive() { log.WithFields(log.Fields{ "context": "HA", @@ -188,18 +183,18 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": env.Name, }).Warn("Icinga 2 detected as not running, stopping.") - h.setResponsibility(stop) + h.setResponsibility(resp_Stop) continue } nextAction = doTakeover - case stop: + case resp_Stop: if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { nextAction = ceaseOperation } else { nextAction = doTakeover } - case notReadyForTakeover: + case resp_NotReadyForTakeover: if h.icinga2IsAlive() { log.WithFields(log.Fields{ "context": "HA", @@ -207,7 +202,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": env.Name, }).Info("Icinga 2 detected as running again.") - h.setResponsibility(readyForTakeover) + h.setResponsibility(resp_ReadyForTakeover) continue } @@ -326,15 +321,15 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D return errTx } - if justTakenOver && h.getResponsibility() != stop { + if justTakenOver && h.getResponsibility() != resp_Stop { if h.responsibleSince == (time.Time{}) { h.responsibleSince = time.Now() - h.setResponsibility(TakeoverNoSync) + h.setResponsibility(resp_TakeoverNoSync) } else { responsibleFor := time.Now().Sub(h.responsibleSince).Seconds() if responsibleFor >= 5.0 { - if h.setResponsibility(TakeoverSync) == TakeoverNoSync { + if h.setResponsibility(resp_TakeoverSync) == resp_TakeoverNoSync { log.WithFields(log.Fields{ "context": "HA", "env": env.Name, @@ -394,7 +389,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D }).Info("Other instance is responsible. Ceasing operations.") h.responsibleSince = time.Time{} - h.setResponsibility(notReadyForTakeover) + h.setResponsibility(resp_NotReadyForTakeover) } select { @@ -416,11 +411,11 @@ func (h *HA) icinga2IsAlive() bool { } // getResponsibility gets the responsibility. -func (h *HA) getResponsibility() responsibility { - return responsibility(atomic.LoadUint32(&h.responsibility)) +func (h *HA) getResponsibility() uint32 { + return atomic.LoadUint32(&h.responsibility) } // setResponsibility sets the responsibility and returns the previous one. -func (h *HA) setResponsibility(r responsibility) responsibility { - return responsibility(atomic.SwapUint32(&h.responsibility, uint32(r))) +func (h *HA) setResponsibility(r uint32) uint32 { + return atomic.SwapUint32(&h.responsibility, uint32(r)) } From 74f6be698778cda71e6cb39fd450bdee01dadb25 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 12:37:25 +0100 Subject: [PATCH 07/39] Use enum for action --- ha.go | 52 ++++++++++++++++++++++++---------------------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/ha.go b/ha.go index 046c4f4e..fbedb290 100644 --- a/ha.go +++ b/ha.go @@ -13,29 +13,25 @@ const ( // readyForTakeover says that we aren't responsible, but we could take over. resp_ReadyForTakeover = iota // TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet. - resp_TakeoverNoSync = iota + resp_TakeoverNoSync // TakeoverSync says that we've taken over and are actually syncing config. - resp_TakeoverSync = iota + resp_TakeoverSync // stop says that we've taken over and are actually syncing config, but we're going to stop it. - resp_Stop = iota + resp_Stop // notReadyForTakeover says that we aren't responsible and can't take over. - resp_NotReadyForTakeover = iota + resp_NotReadyForTakeover ) -// responsibilityAction tells the next action on the database. -type responsibilityAction uint8 - -// noAction says that we won't do anything. -const noAction responsibilityAction = 0 - -// tryTakeover says that we're going to try to take over. -const tryTakeover responsibilityAction = 1 - -// doTakeover says that we're going to take over. -const doTakeover responsibilityAction = 2 - -// ceaseOperation says that we're going to release our responsibility. -const ceaseOperation responsibilityAction = 3 +const ( + // noAction says that we won't do anything. + action_NoAction = iota + // tryTakeover says that we're going to try to take over. + action_TryTakeover + // doTakeover says that we're going to take over. + action_DoTakeover + // ceaseOperation says that we're going to release our responsibility. + action_CeaseOperation +) type HA struct { ourUUID uuid.UUID @@ -141,7 +137,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D everySecond := time.NewTicker(time.Second) defer everySecond.Stop() - var nextAction responsibilityAction + var nextAction = 0 var theirUUID uuid.UUID // Even if Icinga 2 is offline now, Redis may be filled @@ -161,7 +157,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D continue } - nextAction = tryTakeover + nextAction = action_TryTakeover case resp_TakeoverNoSync: if !h.icinga2IsAlive() { log.WithFields(log.Fields{ @@ -174,7 +170,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D continue } - nextAction = tryTakeover + nextAction = action_TryTakeover case resp_TakeoverSync: if !h.icinga2IsAlive() { log.WithFields(log.Fields{ @@ -187,12 +183,12 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D continue } - nextAction = doTakeover + nextAction = action_DoTakeover case resp_Stop: if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { - nextAction = ceaseOperation + nextAction = action_CeaseOperation } else { - nextAction = doTakeover + nextAction = action_DoTakeover } case resp_NotReadyForTakeover: if h.icinga2IsAlive() { @@ -206,13 +202,13 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D continue } - nextAction = noAction + nextAction = action_NoAction } switch nextAction { - case noAction: + case action_NoAction: break - case tryTakeover, doTakeover: + case action_TryTakeover, action_DoTakeover: var justTakenOver bool errTx := dbw.SqlTransactionQuiet(true, true, func(tx *sql.Tx) error { @@ -351,7 +347,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "their_uuid": theirUUID.String(), }).Info("Other instance is responsible") } - case ceaseOperation: + case action_CeaseOperation: errTx := dbw.SqlTransactionQuiet(true, true, func(tx *sql.Tx) error { rows, errFA := dbw.SqlFetchAllQuiet( tx, From 6dceb4dab3d613ce8618a6c6e9ecde42b72868f6 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 15:21:11 +0100 Subject: [PATCH 08/39] Add heartbeat broker --- heartbeat.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 heartbeat.go diff --git a/heartbeat.go b/heartbeat.go new file mode 100644 index 00000000..3bb7134e --- /dev/null +++ b/heartbeat.go @@ -0,0 +1,48 @@ +package icingadb_ha_lib + +import ( + "crypto/sha1" + "encoding/json" + "git.icinga.com/icingadb-connection" + log "github.com/sirupsen/logrus" + ) + +// Compute SHA1 +func Sha1bytes(bytes []byte) []byte { + hash := sha1.New() + hash.Write(bytes) + return hash.Sum(nil) +} + + +func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { + log.Info("Starting Events broker") + + subscription := rdb.Rdb.Subscribe() + defer subscription.Close(); + + + if err := subscription.Subscribe( + "icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil { + chErr <- err + } + + for { + msg, err := subscription.ReceiveMessage() + if err != nil { + chErr <- err + } + + switch msg.Channel { + case "icinga:stats": + var unJson interface{} = nil + if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil { + chErr <- err + } + + environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) + env := &icingadb_connection.Environment{Name: environment, ID: Sha1bytes([]byte(environment))} + chEnv <- env + } + } +} From b4fa95d5136cb7dc7f44f0ecbf47ef39c8fe6b25 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 16:21:40 +0100 Subject: [PATCH 09/39] Fix tests --- ha_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ha_test.go b/ha_test.go index 73e55c43..4c562a98 100644 --- a/ha_test.go +++ b/ha_test.go @@ -7,10 +7,10 @@ import ( ) func TestHA_setResponsibility(t *testing.T) { - responsibilities := [5]responsibility{ readyForTakeover, TakeoverNoSync, TakeoverSync, stop } + responsibilities := [6]uint32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover } h := new(HA) - previous := responsibility(0) + previous := uint32(0) for _,r := range responsibilities { assert.Equal(t, previous, h.setResponsibility(r), "Should be equal") previous = r @@ -19,9 +19,9 @@ func TestHA_setResponsibility(t *testing.T) { func TestHA_IsResponsible(t *testing.T) { h := new(HA) - h.setResponsibility(TakeoverSync) + h.setResponsibility(resp_TakeoverSync) assert.True(t, h.IsResponsible(), "Should be responsible") - h.setResponsibility(TakeoverNoSync) + h.setResponsibility(resp_TakeoverNoSync) assert.False(t, h.IsResponsible(), "Should not be responsible") } From 1d78029e0da6a500621b0e5e01e89fcf44cea282 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 16:23:18 +0100 Subject: [PATCH 10/39] Have heartbeat return error --- heartbeat.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/heartbeat.go b/heartbeat.go index 3bb7134e..dafbd8d8 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -15,7 +15,7 @@ func Sha1bytes(bytes []byte) []byte { } -func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { +func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingadb_connection.Environment) error { log.Info("Starting Events broker") subscription := rdb.Rdb.Subscribe() @@ -24,20 +24,20 @@ func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingad if err := subscription.Subscribe( "icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil { - chErr <- err + return err } for { msg, err := subscription.ReceiveMessage() if err != nil { - chErr <- err + return err } switch msg.Channel { case "icinga:stats": var unJson interface{} = nil if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil { - chErr <- err + return err } environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) From 16e6816c0f903b5b429f5c7cd058887aab907fca Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 17:06:09 +0100 Subject: [PATCH 11/39] Code style --- ha.go | 6 +++--- heartbeat.go | 5 +---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/ha.go b/ha.go index fbedb290..14747c71 100644 --- a/ha.go +++ b/ha.go @@ -123,9 +123,9 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D return nil } - var errNR error - if h.ourUUID, errNR = uuid.NewRandom(); errNR != nil { - return errNR + var err error + if h.ourUUID, err = uuid.NewRandom(); err != nil { + return err } log.WithFields(log.Fields{ diff --git a/heartbeat.go b/heartbeat.go index dafbd8d8..61a5a661 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -5,7 +5,7 @@ import ( "encoding/json" "git.icinga.com/icingadb-connection" log "github.com/sirupsen/logrus" - ) +) // Compute SHA1 func Sha1bytes(bytes []byte) []byte { @@ -14,14 +14,11 @@ func Sha1bytes(bytes []byte) []byte { return hash.Sum(nil) } - func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingadb_connection.Environment) error { log.Info("Starting Events broker") subscription := rdb.Rdb.Subscribe() defer subscription.Close(); - - if err := subscription.Subscribe( "icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil { return err From 3ad6315b3aab3ab317519fc7282db7090549d584 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Feb 2019 17:06:19 +0100 Subject: [PATCH 12/39] Add heartbeat test --- heartbeat_test.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 heartbeat_test.go diff --git a/heartbeat_test.go b/heartbeat_test.go new file mode 100644 index 00000000..bacc5437 --- /dev/null +++ b/heartbeat_test.go @@ -0,0 +1,48 @@ +package icingadb_ha_lib + +import ( + "encoding/json" + "git.icinga.com/icingadb-connection" + "github.com/go-redis/redis" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var icingastate = "{\"IcingaApplication\":" + + "{\"status\": " + + "{\"icingaapplication\":" + + "{\"app\":{" + + "\"environment\": \"\"" + + "}}}}}" + +func TestIcingaEventsBroker(t *testing.T) { + rd := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + DialTimeout: time.Minute / 2, + ReadTimeout: time.Minute, + WriteTimeout: time.Minute, + }) + + rdb := icingadb_connection.NewRDBWrapper(rd) + + chEnv := make(chan *icingadb_connection.Environment) + + go func() { + err := IcingaEventsBroker(rdb, chEnv) + assert.NoError(t, err, "redis connection error") + }() + + time.Sleep(time.Second * 2) + + var uj interface{} = nil + if err := json.Unmarshal([]byte(icingastate), &uj); err != nil { + assert.Nil(t, err) + } + + rdb.Rdb.Publish("icinga:stats", icingastate) + + env := <-chEnv + + assert.NotNil(t, env.ID, "no valid env received") +} From be85da42506fb2d3b3a25f85b080fa605445e65f Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 25 Feb 2019 14:29:49 +0100 Subject: [PATCH 13/39] Update libs --- ha.go | 16 ++++++++-------- heartbeat.go | 2 +- heartbeat_test.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ha.go b/ha.go index 14747c71..d1666bdb 100644 --- a/ha.go +++ b/ha.go @@ -2,7 +2,7 @@ package icingadb_ha_lib import ( "database/sql" - "git.icinga.com/icingadb-connection" + "git.icinga.com/icingadb/icingadb-connection-lib" "github.com/google/uuid" log "github.com/sirupsen/logrus" "sync/atomic" @@ -100,7 +100,7 @@ func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") errTx := dbw.SqlTransaction(true, true, func(tx *sql.Tx) error { - _, errExec := dbw.SqlExec( + _, errExec := dbw.SqlExecTx( tx, "delete from icingadb_instance by heartbeat", `DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`, @@ -224,7 +224,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D } if len(rows) > 0 { - _, errExec := dbw.SqlExecQuiet( + _, errExec := dbw.SqlExecTxQuiet( tx, "update icingadb_instance by id", `UPDATE icingadb_instance SET environment_id=?, heartbeat=? WHERE id = ?`, @@ -236,7 +236,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D return errExec } } else { - _, errExec := dbw.SqlExecQuiet( + _, errExec := dbw.SqlExecTxQuiet( tx, "insert into icingadb_instance", `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, @@ -271,7 +271,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D justTakenOver = true } else if time.Now().Unix()-rows[0][1].(int64) >= 10 { { - _, errExec := dbw.SqlExecQuiet( + _, errExec := dbw.SqlExecTxQuiet( tx, "update icingadb_instance by environment_id", `UPDATE icingadb_instance SET responsible=? WHERE environment_id = ?`, @@ -283,7 +283,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D } } - _, errExec := dbw.SqlExecQuiet( + _, errExec := dbw.SqlExecTxQuiet( tx, "update icingadb_instance by id", `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, @@ -297,7 +297,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D justTakenOver = true } } else { - _, errExec := dbw.SqlExecQuiet( + _, errExec := dbw.SqlExecTxQuiet( tx, "update icingadb_instance by id", `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, @@ -362,7 +362,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D } if len(rows) > 0 { - _, errExec := dbw.SqlExecQuiet( + _, errExec := dbw.SqlExecTxQuiet( tx, "update icingadb_instance", `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, diff --git a/heartbeat.go b/heartbeat.go index 61a5a661..7a958881 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -3,7 +3,7 @@ package icingadb_ha_lib import ( "crypto/sha1" "encoding/json" - "git.icinga.com/icingadb-connection" + "git.icinga.com/icingadb/icingadb-connection-lib" log "github.com/sirupsen/logrus" ) diff --git a/heartbeat_test.go b/heartbeat_test.go index bacc5437..0dca805c 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -2,7 +2,7 @@ package icingadb_ha_lib import ( "encoding/json" - "git.icinga.com/icingadb-connection" + "git.icinga.com/icingadb/icingadb-connection-lib" "github.com/go-redis/redis" "github.com/stretchr/testify/assert" "testing" From 22253621fab5dd2f48ed11a4d638139ad8e2ade5 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 25 Feb 2019 16:28:42 +0100 Subject: [PATCH 14/39] Fix redis use --- heartbeat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat.go b/heartbeat.go index 7a958881..73103a1c 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -17,7 +17,7 @@ func Sha1bytes(bytes []byte) []byte { func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingadb_connection.Environment) error { log.Info("Starting Events broker") - subscription := rdb.Rdb.Subscribe() + subscription := rdb.Subscribe() defer subscription.Close(); if err := subscription.Subscribe( "icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil { From ab777d470ec3efac13fc14ddf962be79b08448e5 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 25 Feb 2019 16:29:08 +0100 Subject: [PATCH 15/39] Add clean up test --- ha_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/ha_test.go b/ha_test.go index 4c562a98..b45997de 100644 --- a/ha_test.go +++ b/ha_test.go @@ -1,11 +1,16 @@ package icingadb_ha_lib import ( + "git.icinga.com/icingadb/icingadb-connection-lib" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "testing" "time" ) +var testID, _ = uuid.FromBytes(make([]byte, 16)) +var testEnv = make([]byte, 20) + func TestHA_setResponsibility(t *testing.T) { responsibilities := [6]uint32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover } h := new(HA) @@ -31,4 +36,49 @@ func TestHA_icinga2IsAlive(t *testing.T) { assert.True(t, h.icinga2IsAlive(), "Should be alive") h.icinga2MTime = h.icinga2MTime - 15 assert.False(t, h.icinga2IsAlive(), "Should be dead") +} + +func Test_cleanUpInstances(t *testing.T) { + var dbw, err = icingadb_connection.NewDBWrapper( + "mysql", + "root:foo@tcp(127.0.0.1:3306)/icingadb?" + + "innodb_strict_mode=1&sql_mode='STRICT_ALL_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,NO_ENGINE_SUBSTITUTION,PIPES_AS_CONCAT,ANSI_QUOTES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER'") + assert.NoError(t, err, "SQL error") + + _, err = dbw.SqlExec( + "insert into icingadb_instance", + `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, + testID[:], + testEnv, + time.Now().Unix() - 25, + "n", + ) + + assert.NoError(t, err, "SQL error") + + rows, err := dbw.SqlFetchAll(dbw.Db, "", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) + + assert.Equal(t, 1, len(rows)) + + err = cleanUpInstances(dbw) + + assert.NoError(t, err, "Clean up failed") + + rows, err = dbw.SqlFetchAll(dbw.Db, "", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) + + assert.NoError(t, err, "SQL error") + + assert.Equal(t, 1, len(rows)) + + time.Sleep(time.Second * 5) + + err = cleanUpInstances(dbw) + + assert.NoError(t, err, "Clean up failed") + + rows, err = dbw.SqlFetchAll(dbw.Db, "", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) + + assert.NoError(t, err, "SQL error") + + assert.Equal(t, 0, len(rows)) } \ No newline at end of file From 789545f2578b2354088cc731b6275971a4126b70 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 26 Feb 2019 09:23:12 +0100 Subject: [PATCH 16/39] Rename package --- ha.go | 2 +- ha_test.go | 2 +- heartbeat.go | 2 +- heartbeat_test.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ha.go b/ha.go index d1666bdb..bc14d3d4 100644 --- a/ha.go +++ b/ha.go @@ -1,4 +1,4 @@ -package icingadb_ha_lib +package icingadb_ha import ( "database/sql" diff --git a/ha_test.go b/ha_test.go index b45997de..b8399cdb 100644 --- a/ha_test.go +++ b/ha_test.go @@ -1,4 +1,4 @@ -package icingadb_ha_lib +package icingadb_ha import ( "git.icinga.com/icingadb/icingadb-connection-lib" diff --git a/heartbeat.go b/heartbeat.go index 73103a1c..6d3b805b 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -1,4 +1,4 @@ -package icingadb_ha_lib +package icingadb_ha import ( "crypto/sha1" diff --git a/heartbeat_test.go b/heartbeat_test.go index 0dca805c..3ab4a93b 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -1,4 +1,4 @@ -package icingadb_ha_lib +package icingadb_ha import ( "encoding/json" From 924fb4b9535dbc135cb564622b4b6423065b3308 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 26 Feb 2019 13:28:39 +0100 Subject: [PATCH 17/39] Update with connection --- ha.go | 13 ++++++------- ha_test.go | 6 +++--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/ha.go b/ha.go index bc14d3d4..40e2450e 100644 --- a/ha.go +++ b/ha.go @@ -1,7 +1,6 @@ package icingadb_ha import ( - "database/sql" "git.icinga.com/icingadb/icingadb-connection-lib" "github.com/google/uuid" log "github.com/sirupsen/logrus" @@ -99,7 +98,7 @@ func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") - errTx := dbw.SqlTransaction(true, true, func(tx *sql.Tx) error { + errTx := dbw.SqlTransaction(true, true, false, func(tx icingadb_connection.DbTransaction) error { _, errExec := dbw.SqlExecTx( tx, "delete from icingadb_instance by heartbeat", @@ -211,9 +210,9 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D case action_TryTakeover, action_DoTakeover: var justTakenOver bool - errTx := dbw.SqlTransactionQuiet(true, true, func(tx *sql.Tx) error { + errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error { { - rows, errFA := dbw.SqlFetchAllQuiet( + rows, errFA := dbw.SqlFetchAllTxQuiet( tx, "select from icingadb_instance by id", `SELECT 1 FROM icingadb_instance WHERE id = ?`, @@ -253,7 +252,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D justTakenOver = false - rows, errFA := dbw.SqlFetchAllQuiet( + rows, errFA := dbw.SqlFetchAllTxQuiet( tx, "select from icingadb_instance by environment_id, responsible", `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ?`, @@ -348,8 +347,8 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D }).Info("Other instance is responsible") } case action_CeaseOperation: - errTx := dbw.SqlTransactionQuiet(true, true, func(tx *sql.Tx) error { - rows, errFA := dbw.SqlFetchAllQuiet( + errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error { + rows, errFA := dbw.SqlFetchAllTxQuiet( tx, "select from icingadb_instance by environment_id, responsible, heartbeat", `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND ? - heartbeat < 10`, diff --git a/ha_test.go b/ha_test.go index b8399cdb..ae0fccfd 100644 --- a/ha_test.go +++ b/ha_test.go @@ -56,7 +56,7 @@ func Test_cleanUpInstances(t *testing.T) { assert.NoError(t, err, "SQL error") - rows, err := dbw.SqlFetchAll(dbw.Db, "", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) + rows, err := dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) assert.Equal(t, 1, len(rows)) @@ -64,7 +64,7 @@ func Test_cleanUpInstances(t *testing.T) { assert.NoError(t, err, "Clean up failed") - rows, err = dbw.SqlFetchAll(dbw.Db, "", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) + rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) assert.NoError(t, err, "SQL error") @@ -76,7 +76,7 @@ func Test_cleanUpInstances(t *testing.T) { assert.NoError(t, err, "Clean up failed") - rows, err = dbw.SqlFetchAll(dbw.Db, "", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) + rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) assert.NoError(t, err, "SQL error") From 2ccabb251776e02734a20ba7831048fe9801157a Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 26 Feb 2019 14:24:18 +0100 Subject: [PATCH 18/39] Store env with HA object --- ha.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/ha.go b/ha.go index 40e2450e..ebb011fa 100644 --- a/ha.go +++ b/ha.go @@ -34,6 +34,7 @@ const ( type HA struct { ourUUID uuid.UUID + ourEnv *icingadb_connection.Environment icinga2MTime int64 // responsibility tells whether we're responsible for our environment. responsibility uint32 @@ -114,10 +115,8 @@ func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") - var env *icingadb_connection.Environment = nil var hasEnv bool - - env, hasEnv = <-chEnv + h.ourEnv, hasEnv = <-chEnv if !hasEnv { return nil } @@ -130,7 +129,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", "uuid": h.ourUUID.String(), - "env": env.Name, + "env": h.ourEnv.Name, }).Info("Received environment from Icinga 2") everySecond := time.NewTicker(time.Second) @@ -149,7 +148,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", "uuid": h.ourUUID.String(), - "env": env.Name, + "env": h.ourEnv.Name, }).Warn("Icinga 2 detected as not running, stopping.") h.setResponsibility(resp_NotReadyForTakeover) @@ -162,7 +161,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", "uuid": h.ourUUID.String(), - "env": env.Name, + "env": h.ourEnv.Name, }).Warn("Icinga 2 detected as not running, stopping.") h.setResponsibility(resp_Stop) @@ -175,7 +174,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", "uuid": h.ourUUID.String(), - "env": env.Name, + "env": h.ourEnv.Name, }).Warn("Icinga 2 detected as not running, stopping.") h.setResponsibility(resp_Stop) @@ -194,7 +193,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", "uuid": h.ourUUID.String(), - "env": env.Name, + "env": h.ourEnv.Name, }).Info("Icinga 2 detected as running again.") h.setResponsibility(resp_ReadyForTakeover) @@ -227,7 +226,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D tx, "update icingadb_instance by id", `UPDATE icingadb_instance SET environment_id=?, heartbeat=? WHERE id = ?`, - env.ID, + h.ourEnv.ID, time.Now().Unix(), h.ourUUID[:], ) @@ -240,7 +239,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "insert into icingadb_instance", `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, h.ourUUID[:], - env.ID, + h.ourEnv.ID, time.Now().Unix(), "n", ) @@ -256,7 +255,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D tx, "select from icingadb_instance by environment_id, responsible", `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ?`, - env.ID, + h.ourEnv.ID, "y", ) if errFA != nil { @@ -275,7 +274,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "update icingadb_instance by environment_id", `UPDATE icingadb_instance SET responsible=? WHERE environment_id = ?`, "n", - env.ID, + h.ourEnv.ID, ) if errExec != nil { return errExec @@ -327,7 +326,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D if h.setResponsibility(resp_TakeoverSync) == resp_TakeoverNoSync { log.WithFields(log.Fields{ "context": "HA", - "env": env.Name, + "env": h.ourEnv.Name, "their_uuid": theirUUID.String(), }).Info("Taking over") } @@ -342,7 +341,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D if !justTakenOver { log.WithFields(log.Fields{ "context": "HA", - "env": env.Name, + "env": h.ourEnv.Name, "their_uuid": theirUUID.String(), }).Info("Other instance is responsible") } @@ -352,7 +351,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D tx, "select from icingadb_instance by environment_id, responsible, heartbeat", `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND ? - heartbeat < 10`, - env.ID, + h.ourEnv.ID, "n", time.Now().Unix(), ) @@ -380,7 +379,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", - "env": env.Name, + "env": h.ourEnv.Name, }).Info("Other instance is responsible. Ceasing operations.") h.responsibleSince = time.Time{} @@ -388,7 +387,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D } select { - case env, hasEnv = <-chEnv: + case h.ourEnv, hasEnv = <-chEnv: if !hasEnv { return nil } From 640d8132a2d4aed93132777f07cb7533d62c8017 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 26 Feb 2019 14:28:38 +0100 Subject: [PATCH 19/39] Refactor handle responsibility --- ha.go | 129 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/ha.go b/ha.go index ebb011fa..443fa395 100644 --- a/ha.go +++ b/ha.go @@ -112,6 +112,72 @@ func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { return errTx } +func (h *HA) handleResponsibility() (cont bool, nextAction int) { + switch h.getResponsibility() { + case resp_ReadyForTakeover: + if !h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": h.ourEnv.Name, + }).Warn("Icinga 2 detected as not running, stopping.") + + h.setResponsibility(resp_NotReadyForTakeover) + cont = true + } + + nextAction = action_TryTakeover + case resp_TakeoverNoSync: + if !h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": h.ourEnv.Name, + }).Warn("Icinga 2 detected as not running, stopping.") + + h.setResponsibility(resp_Stop) + cont = true + } + + nextAction = action_TryTakeover + case resp_TakeoverSync: + if !h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": h.ourEnv.Name, + }).Warn("Icinga 2 detected as not running, stopping.") + + h.setResponsibility(resp_Stop) + cont = true + } + + nextAction = action_DoTakeover + case resp_Stop: + if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { + nextAction = action_CeaseOperation + } else { + nextAction = action_DoTakeover + } + cont = false + case resp_NotReadyForTakeover: + if h.icinga2IsAlive() { + log.WithFields(log.Fields{ + "context": "HA", + "uuid": h.ourUUID.String(), + "env": h.ourEnv.Name, + }).Info("Icinga 2 detected as running again.") + + h.setResponsibility(resp_ReadyForTakeover) + cont = true + } + + nextAction = action_NoAction + } + + return +} + func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") @@ -135,72 +201,15 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D everySecond := time.NewTicker(time.Second) defer everySecond.Stop() - var nextAction = 0 var theirUUID uuid.UUID // Even if Icinga 2 is offline now, Redis may be filled h.Icinga2HeartBeat() for { - switch h.getResponsibility() { - case resp_ReadyForTakeover: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_NotReadyForTakeover) - continue - } - - nextAction = action_TryTakeover - case resp_TakeoverNoSync: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_Stop) - continue - } - - nextAction = action_TryTakeover - case resp_TakeoverSync: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_Stop) - continue - } - - nextAction = action_DoTakeover - case resp_Stop: - if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { - nextAction = action_CeaseOperation - } else { - nextAction = action_DoTakeover - } - case resp_NotReadyForTakeover: - if h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Info("Icinga 2 detected as running again.") - - h.setResponsibility(resp_ReadyForTakeover) - continue - } - - nextAction = action_NoAction + cont, nextAction := h.handleResponsibility() + if cont { + continue } switch nextAction { From 26a0211054e81d4bb30998eaf83983408ca43264 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 26 Feb 2019 14:58:49 +0100 Subject: [PATCH 20/39] Use easier types --- ha.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ha.go b/ha.go index 443fa395..0ca24722 100644 --- a/ha.go +++ b/ha.go @@ -37,7 +37,7 @@ type HA struct { ourEnv *icingadb_connection.Environment icinga2MTime int64 // responsibility tells whether we're responsible for our environment. - responsibility uint32 + responsibility int32 // responsibleSince tells since when we're responsible for our environment. responsibleSince time.Time // runningCriticalOperations counts the currently running critical operations. @@ -414,11 +414,11 @@ func (h *HA) icinga2IsAlive() bool { } // getResponsibility gets the responsibility. -func (h *HA) getResponsibility() uint32 { - return atomic.LoadUint32(&h.responsibility) +func (h *HA) getResponsibility() int { + return int(atomic.LoadInt32(&h.responsibility)) } // setResponsibility sets the responsibility and returns the previous one. -func (h *HA) setResponsibility(r uint32) uint32 { - return atomic.SwapUint32(&h.responsibility, uint32(r)) +func (h *HA) setResponsibility(r int32) int32 { + return atomic.SwapInt32(&h.responsibility, r) } From 400c9b28c0185a6c6969cd0cd7719dc297ce708f Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 26 Feb 2019 15:00:06 +0100 Subject: [PATCH 21/39] Add repsonsibility test --- ha_test.go | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/ha_test.go b/ha_test.go index ae0fccfd..ad5053ed 100644 --- a/ha_test.go +++ b/ha_test.go @@ -12,10 +12,10 @@ var testID, _ = uuid.FromBytes(make([]byte, 16)) var testEnv = make([]byte, 20) func TestHA_setResponsibility(t *testing.T) { - responsibilities := [6]uint32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover } + responsibilities := [6]int32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover } h := new(HA) - previous := uint32(0) + previous := int32(0) for _,r := range responsibilities { assert.Equal(t, previous, h.setResponsibility(r), "Should be equal") previous = r @@ -38,6 +38,39 @@ func TestHA_icinga2IsAlive(t *testing.T) { assert.False(t, h.icinga2IsAlive(), "Should be dead") } +func TestHA_handleResponsibility(t *testing.T) { + h := new(HA) + h.ourEnv = &icingadb_connection.Environment{make([]byte, 20), "test"} + h.setResponsibility(resp_ReadyForTakeover) + var cont bool + var na int + + print(h.icinga2MTime) + + cont, na = h.handleResponsibility() + assert.True(t, cont) + assert.Equal(t, action_TryTakeover, na) + assert.Equal(t, resp_NotReadyForTakeover, h.getResponsibility()) + + //AWAKEN + h.icinga2MTime = time.Now().Unix() + cont, na = h.handleResponsibility() + assert.True(t, cont) + assert.Equal(t, resp_ReadyForTakeover, h.getResponsibility()) + + h.setResponsibility(resp_TakeoverSync) + cont, na = h.handleResponsibility() + assert.False(t, cont) + assert.Equal(t, action_DoTakeover, na) + assert.Equal(t, resp_TakeoverSync, h.getResponsibility()) + + //SLEEP + h.icinga2MTime = 0 + cont, na = h.handleResponsibility() + assert.True(t, cont) + assert.Equal(t, na, action_DoTakeover) +} + func Test_cleanUpInstances(t *testing.T) { var dbw, err = icingadb_connection.NewDBWrapper( "mysql", From 3296e36f8ec42e33d92927b6554aaf8a1a68d8b7 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Mon, 4 Mar 2019 13:59:40 +0100 Subject: [PATCH 22/39] Change tests to be compatible with icingadb-connection --- ha_test.go | 2 +- heartbeat_test.go | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/ha_test.go b/ha_test.go index ad5053ed..5924a1c2 100644 --- a/ha_test.go +++ b/ha_test.go @@ -74,7 +74,7 @@ func TestHA_handleResponsibility(t *testing.T) { func Test_cleanUpInstances(t *testing.T) { var dbw, err = icingadb_connection.NewDBWrapper( "mysql", - "root:foo@tcp(127.0.0.1:3306)/icingadb?" + + "module-dev:icinga0815!@tcp(127.0.0.1:3306)/icingadb?" + "innodb_strict_mode=1&sql_mode='STRICT_ALL_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,NO_ENGINE_SUBSTITUTION,PIPES_AS_CONCAT,ANSI_QUOTES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER'") assert.NoError(t, err, "SQL error") diff --git a/heartbeat_test.go b/heartbeat_test.go index 3ab4a93b..190e1dad 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -3,7 +3,6 @@ package icingadb_ha import ( "encoding/json" "git.icinga.com/icingadb/icingadb-connection-lib" - "github.com/go-redis/redis" "github.com/stretchr/testify/assert" "testing" "time" @@ -17,14 +16,10 @@ var icingastate = "{\"IcingaApplication\":" + "}}}}}" func TestIcingaEventsBroker(t *testing.T) { - rd := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) - - rdb := icingadb_connection.NewRDBWrapper(rd) + rdb, err := icingadb_connection.NewRDBWrapper("127.0.0.1:6379") + if err != nil { + t.Fatal("This test needs a working Redis connection") + } chEnv := make(chan *icingadb_connection.Environment) From 42091460c2e1c3a5590ab7e732edf68158970caa Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Tue, 5 Mar 2019 17:09:53 +0100 Subject: [PATCH 23/39] Fix imports --- ha.go | 2 +- heartbeat.go | 2 +- heartbeat_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ha.go b/ha.go index 0ca24722..f7986665 100644 --- a/ha.go +++ b/ha.go @@ -1,7 +1,7 @@ package icingadb_ha import ( - "git.icinga.com/icingadb/icingadb-connection-lib" + "git.icinga.com/icingadb/icingadb-connection" "github.com/google/uuid" log "github.com/sirupsen/logrus" "sync/atomic" diff --git a/heartbeat.go b/heartbeat.go index 6d3b805b..9d63ef7a 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -3,7 +3,7 @@ package icingadb_ha import ( "crypto/sha1" "encoding/json" - "git.icinga.com/icingadb/icingadb-connection-lib" + "git.icinga.com/icingadb/icingadb-connection" log "github.com/sirupsen/logrus" ) diff --git a/heartbeat_test.go b/heartbeat_test.go index 190e1dad..616d24e8 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -2,7 +2,7 @@ package icingadb_ha import ( "encoding/json" - "git.icinga.com/icingadb/icingadb-connection-lib" + "git.icinga.com/icingadb/icingadb-connection" "github.com/stretchr/testify/assert" "testing" "time" From e9f3805046c062f6c4924ded12adcc9ae04577ea Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Wed, 6 Mar 2019 09:10:06 +0100 Subject: [PATCH 24/39] Remove semicolon --- heartbeat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat.go b/heartbeat.go index 9d63ef7a..52214846 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -18,7 +18,7 @@ func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingad log.Info("Starting Events broker") subscription := rdb.Subscribe() - defer subscription.Close(); + defer subscription.Close() if err := subscription.Subscribe( "icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil { return err From 4a03590d3bebf50ab2b11ad7696206518c03afc9 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Thu, 7 Mar 2019 17:20:25 +0100 Subject: [PATCH 25/39] Fix libs --- ha_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ha_test.go b/ha_test.go index 5924a1c2..a076b513 100644 --- a/ha_test.go +++ b/ha_test.go @@ -1,7 +1,7 @@ package icingadb_ha import ( - "git.icinga.com/icingadb/icingadb-connection-lib" + "git.icinga.com/icingadb/icingadb-connection" "github.com/google/uuid" "github.com/stretchr/testify/assert" "testing" @@ -73,9 +73,7 @@ func TestHA_handleResponsibility(t *testing.T) { func Test_cleanUpInstances(t *testing.T) { var dbw, err = icingadb_connection.NewDBWrapper( - "mysql", - "module-dev:icinga0815!@tcp(127.0.0.1:3306)/icingadb?" + - "innodb_strict_mode=1&sql_mode='STRICT_ALL_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,NO_ENGINE_SUBSTITUTION,PIPES_AS_CONCAT,ANSI_QUOTES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER'") + "module-dev:icinga0815!@tcp(127.0.0.1:3306)/icingadb?") assert.NoError(t, err, "SQL error") _, err = dbw.SqlExec( From 0682265a0fdd7cc121d505796d31549e82f49fdb Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Thu, 7 Mar 2019 17:20:38 +0100 Subject: [PATCH 26/39] Move Environment to HA --- ha.go | 6 +++--- heartbeat.go | 12 ++++++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/ha.go b/ha.go index f7986665..34cfb665 100644 --- a/ha.go +++ b/ha.go @@ -34,7 +34,7 @@ const ( type HA struct { ourUUID uuid.UUID - ourEnv *icingadb_connection.Environment + ourEnv *Environment icinga2MTime int64 // responsibility tells whether we're responsible for our environment. responsibility int32 @@ -71,7 +71,7 @@ func (h *HA) IsResponsible() bool { return h.getResponsibility() == resp_TakeoverSync } -func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment, chErr chan error) { +func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment, chErr chan error) { go cleanUpInstancesAsync(dbw, chErr) if errRun := h.run(rdb, dbw, chEnv); errRun != nil { @@ -178,7 +178,7 @@ func (h *HA) handleResponsibility() (cont bool, nextAction int) { return } -func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *icingadb_connection.Environment) error { +func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment) error { log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") var hasEnv bool diff --git a/heartbeat.go b/heartbeat.go index 52214846..8b640c57 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -7,6 +7,13 @@ import ( log "github.com/sirupsen/logrus" ) + +type Environment struct { + ID []byte + Name string + configDumpInProgress bool +} + // Compute SHA1 func Sha1bytes(bytes []byte) []byte { hash := sha1.New() @@ -14,7 +21,7 @@ func Sha1bytes(bytes []byte) []byte { return hash.Sum(nil) } -func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingadb_connection.Environment) error { +func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *Environment) error { log.Info("Starting Events broker") subscription := rdb.Subscribe() @@ -38,7 +45,8 @@ func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *icingad } environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) - env := &icingadb_connection.Environment{Name: environment, ID: Sha1bytes([]byte(environment))} + configDumpInProgress := unJson.(map[string]interface{})["config_dump_in_progress"].(bool) + env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), configDumpInProgress: configDumpInProgress} chEnv <- env } } From 84746c2ebfa1244d364f6ecba221f2be85e96ce5 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 8 Mar 2019 10:56:29 +0100 Subject: [PATCH 27/39] Eat env and print id --- ha.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ha.go b/ha.go index 34cfb665..88229c1d 100644 --- a/ha.go +++ b/ha.go @@ -34,7 +34,7 @@ const ( type HA struct { ourUUID uuid.UUID - ourEnv *Environment + ourEnv *Environment icinga2MTime int64 // responsibility tells whether we're responsible for our environment. responsibility int32 @@ -195,7 +195,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, + "env": h.ourEnv.ID, }).Info("Received environment from Icinga 2") everySecond := time.NewTicker(time.Second) @@ -335,7 +335,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D if h.setResponsibility(resp_TakeoverSync) == resp_TakeoverNoSync { log.WithFields(log.Fields{ "context": "HA", - "env": h.ourEnv.Name, + "env": h.ourEnv.ID, "their_uuid": theirUUID.String(), }).Info("Taking over") } @@ -350,7 +350,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D if !justTakenOver { log.WithFields(log.Fields{ "context": "HA", - "env": h.ourEnv.Name, + "env": h.ourEnv.ID, "their_uuid": theirUUID.String(), }).Info("Other instance is responsible") } @@ -388,7 +388,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D log.WithFields(log.Fields{ "context": "HA", - "env": h.ourEnv.Name, + "env": h.ourEnv.ID, }).Info("Other instance is responsible. Ceasing operations.") h.responsibleSince = time.Time{} @@ -401,6 +401,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D return nil } + h.Icinga2HeartBeat() <-everySecond.C case <-everySecond.C: break From 51605d4b2addda2006f05f85506bdab763314456 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Fri, 8 Mar 2019 16:40:58 +0100 Subject: [PATCH 28/39] Add NotificationListener system --- ha.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/ha.go b/ha.go index 88229c1d..ff092e81 100644 --- a/ha.go +++ b/ha.go @@ -32,6 +32,11 @@ const ( action_CeaseOperation ) +const ( + Notify_IsResponsible = iota + Notify_IsNotResponsible +) + type HA struct { ourUUID uuid.UUID ourEnv *Environment @@ -44,6 +49,7 @@ type HA struct { runningCriticalOperations uint64 // lastCriticalOperationEnd tells when the last critical operation finished. lastCriticalOperationEnd int64 + notificationListeners []chan int } // RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible. @@ -149,6 +155,7 @@ func (h *HA) handleResponsibility() (cont bool, nextAction int) { }).Warn("Icinga 2 detected as not running, stopping.") h.setResponsibility(resp_Stop) + h.notifyNotificationListener(Notify_IsNotResponsible) cont = true } @@ -338,6 +345,7 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": h.ourEnv.ID, "their_uuid": theirUUID.String(), }).Info("Taking over") + h.notifyNotificationListener(Notify_IsResponsible) } if _, errRP := rdb.Publish("icingadb:wakeup", h.ourUUID.String()).Result(); errRP != nil { @@ -423,3 +431,17 @@ func (h *HA) getResponsibility() int { func (h *HA) setResponsibility(r int32) int32 { return atomic.SwapInt32(&h.responsibility, r) } + +func (h *HA) RegisterNotificationListener() chan int { + ch := make(chan int) + h.notificationListeners = append(h.notificationListeners, ch) + return ch +} + +func (h *HA) notifyNotificationListener(msg int) { + for _, c := range h.notificationListeners { + go func() { + c <- msg + }() + } +} \ No newline at end of file From 44d229838ee7929e7d4a64d12c9181baaa5db8b6 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Wed, 20 Mar 2019 14:02:57 +0100 Subject: [PATCH 29/39] Fix notification system --- ha.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ha.go b/ha.go index ff092e81..b4944c1b 100644 --- a/ha.go +++ b/ha.go @@ -440,8 +440,8 @@ func (h *HA) RegisterNotificationListener() chan int { func (h *HA) notifyNotificationListener(msg int) { for _, c := range h.notificationListeners { - go func() { - c <- msg - }() + go func(ch chan int) { + ch <- msg + }(c) } } \ No newline at end of file From 98cbc9e202f0893f20cab12ed972071a9ee49471 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Wed, 20 Mar 2019 15:14:05 +0100 Subject: [PATCH 30/39] Do not start config sync during config dump --- ha.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ha.go b/ha.go index b4944c1b..bb4366d2 100644 --- a/ha.go +++ b/ha.go @@ -33,8 +33,8 @@ const ( ) const ( - Notify_IsResponsible = iota - Notify_IsNotResponsible + Notify_StartSync = iota + Notify_StopSync ) type HA struct { @@ -155,7 +155,7 @@ func (h *HA) handleResponsibility() (cont bool, nextAction int) { }).Warn("Icinga 2 detected as not running, stopping.") h.setResponsibility(resp_Stop) - h.notifyNotificationListener(Notify_IsNotResponsible) + h.notifyNotificationListener(Notify_StopSync) cont = true } @@ -345,7 +345,12 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": h.ourEnv.ID, "their_uuid": theirUUID.String(), }).Info("Taking over") - h.notifyNotificationListener(Notify_IsResponsible) + + // TODO: This should not be done here, but on configDumpInProgress changes. + // It's only possible to do it here, because we always lose responsibility during config dump once + if !h.ourEnv.configDumpInProgress { + h.notifyNotificationListener(Notify_StartSync) + } } if _, errRP := rdb.Publish("icingadb:wakeup", h.ourUUID.String()).Result(); errRP != nil { From c4db185f1b45d304523a7e673c069f2ad82aa4a4 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Fri, 22 Mar 2019 11:26:38 +0100 Subject: [PATCH 31/39] Sync env with supervisor --- ha.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/ha.go b/ha.go index bb4366d2..139ef7b9 100644 --- a/ha.go +++ b/ha.go @@ -2,6 +2,7 @@ package icingadb_ha import ( "git.icinga.com/icingadb/icingadb-connection" + "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/google/uuid" log "github.com/sirupsen/logrus" "sync/atomic" @@ -38,18 +39,27 @@ const ( ) type HA struct { - ourUUID uuid.UUID - ourEnv *Environment - icinga2MTime int64 + super supervisor.Supervisor + ourUUID uuid.UUID + ourEnv *Environment + icinga2MTime int64 // responsibility tells whether we're responsible for our environment. - responsibility int32 + responsibility int32 // responsibleSince tells since when we're responsible for our environment. - responsibleSince time.Time + responsibleSince time.Time // runningCriticalOperations counts the currently running critical operations. - runningCriticalOperations uint64 + runningCriticalOperations uint64 // lastCriticalOperationEnd tells when the last critical operation finished. - lastCriticalOperationEnd int64 - notificationListeners []chan int + lastCriticalOperationEnd int64 + notificationListeners []chan int +} + +func NewHA(super supervisor.Supervisor) HA { + ha := HA{ + super: super, + } + + return ha } // RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible. @@ -414,6 +424,10 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D return nil } + h.super.EnvLock.Lock() + h.super.EnvId = h.ourEnv.ID + h.super.EnvLock.Unlock() + h.Icinga2HeartBeat() <-everySecond.C case <-everySecond.C: From af6a7acf4847fbf489866f703a17dfff0074f840 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Fri, 22 Mar 2019 12:56:42 +0100 Subject: [PATCH 32/39] Supervisor should be passed by reference --- ha.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ha.go b/ha.go index 139ef7b9..099684e0 100644 --- a/ha.go +++ b/ha.go @@ -39,7 +39,7 @@ const ( ) type HA struct { - super supervisor.Supervisor + super *supervisor.Supervisor ourUUID uuid.UUID ourEnv *Environment icinga2MTime int64 @@ -54,7 +54,7 @@ type HA struct { notificationListeners []chan int } -func NewHA(super supervisor.Supervisor) HA { +func NewHA(super *supervisor.Supervisor) HA { ha := HA{ super: super, } From 340229af85132d56cf4fd9a19d1cfda3e1da82f4 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Fri, 22 Mar 2019 15:26:12 +0100 Subject: [PATCH 33/39] Fix env --- ha.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ha.go b/ha.go index 099684e0..1d34f23a 100644 --- a/ha.go +++ b/ha.go @@ -215,6 +215,10 @@ func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.D "env": h.ourEnv.ID, }).Info("Received environment from Icinga 2") + h.super.EnvLock.Lock() + h.super.EnvId = h.ourEnv.ID + h.super.EnvLock.Unlock() + everySecond := time.NewTicker(time.Second) defer everySecond.Stop() From 80e1726cdfc690fbcef262a0f3308660103d4b77 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Mar 2019 13:20:26 +0100 Subject: [PATCH 34/39] Implement new HA --- ha.go | 515 +++++++++++------------------------------------------ ha_test.go | 115 ------------ 2 files changed, 100 insertions(+), 530 deletions(-) delete mode 100644 ha_test.go diff --git a/ha.go b/ha.go index 1d34f23a..29dc3ba1 100644 --- a/ha.go +++ b/ha.go @@ -1,460 +1,145 @@ package icingadb_ha import ( - "git.icinga.com/icingadb/icingadb-connection" + "bytes" + "fmt" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/google/uuid" log "github.com/sirupsen/logrus" - "sync/atomic" "time" ) -const ( - // readyForTakeover says that we aren't responsible, but we could take over. - resp_ReadyForTakeover = iota - // TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet. - resp_TakeoverNoSync - // TakeoverSync says that we've taken over and are actually syncing config. - resp_TakeoverSync - // stop says that we've taken over and are actually syncing config, but we're going to stop it. - resp_Stop - // notReadyForTakeover says that we aren't responsible and can't take over. - resp_NotReadyForTakeover -) - -const ( - // noAction says that we won't do anything. - action_NoAction = iota - // tryTakeover says that we're going to try to take over. - action_TryTakeover - // doTakeover says that we're going to take over. - action_DoTakeover - // ceaseOperation says that we're going to release our responsibility. - action_CeaseOperation -) - const ( Notify_StartSync = iota Notify_StopSync ) type HA struct { - super *supervisor.Supervisor - ourUUID uuid.UUID - ourEnv *Environment - icinga2MTime int64 - // responsibility tells whether we're responsible for our environment. - responsibility int32 - // responsibleSince tells since when we're responsible for our environment. - responsibleSince time.Time - // runningCriticalOperations counts the currently running critical operations. - runningCriticalOperations uint64 - // lastCriticalOperationEnd tells when the last critical operation finished. - lastCriticalOperationEnd int64 - notificationListeners []chan int + isActive bool + icinga2MTime int64 + uid uuid.UUID + super *supervisor.Supervisor + notificationListeners []chan int } -func NewHA(super *supervisor.Supervisor) HA { - ha := HA{ +func newHA(super *supervisor.Supervisor) (*HA, error) { + var err error + ho := HA{ super: super, } - return ha -} - -// RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible. -func (h *HA) RunCriticalOperation(op func() error) error { - switch h.getResponsibility() { - case resp_TakeoverSync, resp_Stop: - atomic.AddUint64(&h.runningCriticalOperations, 1) - - err := op() - - atomic.StoreInt64(&h.lastCriticalOperationEnd, time.Now().Unix()) - atomic.AddUint64(&h.runningCriticalOperations, ^uint64(0)) - - return err + if ho.uid, err = uuid.NewRandom(); err != nil { + return nil, err } - return nil + return &ho, nil } -func (h *HA) Icinga2HeartBeat() { - atomic.StoreInt64(&h.icinga2MTime, time.Now().Unix()) +func (h *HA) icinga2HeartBeat() { + h.icinga2MTime = time.Now().Unix() } -func (h *HA) IsResponsible() bool { - return h.getResponsibility() == resp_TakeoverSync +func (h *HA) AreWeActive() bool { + return h.isActive } -func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment, chErr chan error) { - go cleanUpInstancesAsync(dbw, chErr) +func (h *HA) updateInstance() error { + _, err := h.super.Dbw.SqlExec("update icingadb_instance by environment", + fmt.Sprintf("UPDATE icingadb_instance SET heartbeat = %d", h.icinga2MTime)) + return err +} - if errRun := h.run(rdb, dbw, chEnv); errRun != nil { - chErr <- errRun - return +func (h *HA) insertInstance() error { + _, err := h.super.Dbw.SqlExec("insert into icingadb_instance", + fmt.Sprintf("INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (%s, %s, %d, 'y')", + h.uid, h.super.EnvId, h.icinga2MTime)) + return err +} + +func (h *HA) getInstance() (uuid.UUID, int64, error) { + rows, err := h.super.Dbw.SqlFetchAll("select id, heartbeat from icingadb_instance where environment_id = ourEnvID", + "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", + h.super.EnvId, + ) + if err != nil { + return uuid.UUID{}, 0, err } -} - -// cleanUpInstancesAsync cleans up icingadb_instance periodically. -func cleanUpInstancesAsync(dbw *icingadb_connection.DBWrapper, chErr chan error) { - every5m := time.NewTicker(5 * time.Minute) - defer every5m.Stop() - - for { - <-every5m.C - - if errCI := cleanUpInstances(dbw); errCI != nil { - chErr <- errCI - } + if len(rows) == 0 { + return uuid.UUID{}, 0, nil } -} - -// cleanUpInstances cleans up icingadb_instance periodically. -func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { - - log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") - - errTx := dbw.SqlTransaction(true, true, false, func(tx icingadb_connection.DbTransaction) error { - _, errExec := dbw.SqlExecTx( - tx, - "delete from icingadb_instance by heartbeat", - `DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`, - time.Now().Unix(), - ) - - return errExec - }) - return errTx -} - -func (h *HA) handleResponsibility() (cont bool, nextAction int) { - switch h.getResponsibility() { - case resp_ReadyForTakeover: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_NotReadyForTakeover) - cont = true - } - - nextAction = action_TryTakeover - case resp_TakeoverNoSync: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_Stop) - cont = true - } - - nextAction = action_TryTakeover - case resp_TakeoverSync: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_Stop) - h.notifyNotificationListener(Notify_StopSync) - cont = true - } - - nextAction = action_DoTakeover - case resp_Stop: - if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { - nextAction = action_CeaseOperation - } else { - nextAction = action_DoTakeover - } - cont = false - case resp_NotReadyForTakeover: - if h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Info("Icinga 2 detected as running again.") - - h.setResponsibility(resp_ReadyForTakeover) - cont = true - } - - nextAction = action_NoAction - } - - return -} - -func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment) error { - log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") - - var hasEnv bool - h.ourEnv, hasEnv = <-chEnv - if !hasEnv { - return nil - } - - var err error - if h.ourUUID, err = uuid.NewRandom(); err != nil { - return err - } - - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.ID, - }).Info("Received environment from Icinga 2") - - h.super.EnvLock.Lock() - h.super.EnvId = h.ourEnv.ID - h.super.EnvLock.Unlock() - - everySecond := time.NewTicker(time.Second) - defer everySecond.Stop() var theirUUID uuid.UUID + copy(theirUUID[:], rows[0][0].([]byte)) - // Even if Icinga 2 is offline now, Redis may be filled - h.Icinga2HeartBeat() + return theirUUID, rows[0][1].(int64), nil +} +func (h *HA) Run(chEnv chan *Environment) { + // Wait for first heartbeat + env := <-chEnv + if env == nil { + log.Fatal("Environment empty?!") + } + h.super.EnvId = env.ID + + _, beat, err := h.getInstance() + if err != nil { + log.Fatal(err) + } + + if time.Now().Unix()-beat > 15 { + // This means there was no instance row match, insert + if beat == 0 { + err = h.insertInstance() + } else { + err = h.updateInstance() + } + if err != nil { + log.Fatal(err) + } + + h.isActive = true + h.notifyNotificationListener(Notify_StartSync) + } else { + h.isActive = false + } + + timerHA := time.NewTimer(time.Second * 15) for { - cont, nextAction := h.handleResponsibility() - if cont { - continue - } - - switch nextAction { - case action_NoAction: - break - case action_TryTakeover, action_DoTakeover: - var justTakenOver bool - - errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error { - { - rows, errFA := dbw.SqlFetchAllTxQuiet( - tx, - "select from icingadb_instance by id", - `SELECT 1 FROM icingadb_instance WHERE id = ?`, - h.ourUUID[:], - ) - if errFA != nil { - return errFA - } - - if len(rows) > 0 { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by id", - `UPDATE icingadb_instance SET environment_id=?, heartbeat=? WHERE id = ?`, - h.ourEnv.ID, - time.Now().Unix(), - h.ourUUID[:], - ) - if errExec != nil { - return errExec - } - } else { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "insert into icingadb_instance", - `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, - h.ourUUID[:], - h.ourEnv.ID, - time.Now().Unix(), - "n", - ) - if errExec != nil { - return errExec - } - } - } - - justTakenOver = false - - rows, errFA := dbw.SqlFetchAllTxQuiet( - tx, - "select from icingadb_instance by environment_id, responsible", - `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ?`, - h.ourEnv.ID, - "y", - ) - if errFA != nil { - return errFA - } - - if len(rows) > 0 { - copy(theirUUID[:], rows[0][0].([]byte)) - - if theirUUID == h.ourUUID { - justTakenOver = true - } else if time.Now().Unix()-rows[0][1].(int64) >= 10 { - { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by environment_id", - `UPDATE icingadb_instance SET responsible=? WHERE environment_id = ?`, - "n", - h.ourEnv.ID, - ) - if errExec != nil { - return errExec - } - } - - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by id", - `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, - "y", - h.ourUUID[:], - ) - if errExec != nil { - return errExec - } - - justTakenOver = true - } - } else { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by id", - `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, - "y", - h.ourUUID[:], - ) - if errExec != nil { - return errExec - } - - justTakenOver = true - } - - return nil - }) - if errTx != nil { - return errTx - } - - if justTakenOver && h.getResponsibility() != resp_Stop { - if h.responsibleSince == (time.Time{}) { - h.responsibleSince = time.Now() - h.setResponsibility(resp_TakeoverNoSync) - } else { - responsibleFor := time.Now().Sub(h.responsibleSince).Seconds() - - if responsibleFor >= 5.0 { - if h.setResponsibility(resp_TakeoverSync) == resp_TakeoverNoSync { - log.WithFields(log.Fields{ - "context": "HA", - "env": h.ourEnv.ID, - "their_uuid": theirUUID.String(), - }).Info("Taking over") - - // TODO: This should not be done here, but on configDumpInProgress changes. - // It's only possible to do it here, because we always lose responsibility during config dump once - if !h.ourEnv.configDumpInProgress { - h.notifyNotificationListener(Notify_StartSync) - } - } - - if _, errRP := rdb.Publish("icingadb:wakeup", h.ourUUID.String()).Result(); errRP != nil { - return errRP - } - } - } - } - - if !justTakenOver { - log.WithFields(log.Fields{ - "context": "HA", - "env": h.ourEnv.ID, - "their_uuid": theirUUID.String(), - }).Info("Other instance is responsible") - } - case action_CeaseOperation: - errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error { - rows, errFA := dbw.SqlFetchAllTxQuiet( - tx, - "select from icingadb_instance by environment_id, responsible, heartbeat", - `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND ? - heartbeat < 10`, - h.ourEnv.ID, - "n", - time.Now().Unix(), - ) - if errFA != nil { - return errFA - } - - if len(rows) > 0 { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance", - `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, - "n", - h.ourUUID[:], - ) - - return errExec - } - - return nil - }) - if errTx != nil { - return errTx - } - - log.WithFields(log.Fields{ - "context": "HA", - "env": h.ourEnv.ID, - }).Info("Other instance is responsible. Ceasing operations.") - - h.responsibleSince = time.Time{} - h.setResponsibility(resp_NotReadyForTakeover) - } - select { - case h.ourEnv, hasEnv = <-chEnv: - if !hasEnv { - return nil + case env := <-chEnv: + if bytes.Compare(env.ID, h.super.EnvId) != 0 { + log.Fatal("Received environment is not the one we expected. Panic.") } - h.super.EnvLock.Lock() - h.super.EnvId = h.ourEnv.ID - h.super.EnvLock.Unlock() - - h.Icinga2HeartBeat() - <-everySecond.C - case <-everySecond.C: - break + timerHA.Reset(time.Second * 15) + previous := h.icinga2MTime + h.icinga2HeartBeat() + if h.icinga2MTime-previous < 10 { + if h.isActive { + err = h.updateInstance() + } + } else { + they, beat, err := h.getInstance() + if err != nil { + log.Fatal(err) + } + if they == h.uid { + if err := h.updateInstance(); err != nil { + log.Fatal(err) + } + } else if h.icinga2MTime-beat > 15 { + h.isActive = true + h.notifyNotificationListener(Notify_StartSync) + } + } + case <-timerHA.C: + h.isActive = false + h.notifyNotificationListener(Notify_StopSync) } } } -// icinga2IsAlive returns whether Icinga 2 seems to be running. -func (h *HA) icinga2IsAlive() bool { - return time.Now().Unix()-atomic.LoadInt64(&h.icinga2MTime) < 15 -} - -// getResponsibility gets the responsibility. -func (h *HA) getResponsibility() int { - return int(atomic.LoadInt32(&h.responsibility)) -} - -// setResponsibility sets the responsibility and returns the previous one. -func (h *HA) setResponsibility(r int32) int32 { - return atomic.SwapInt32(&h.responsibility, r) -} - func (h *HA) RegisterNotificationListener() chan int { ch := make(chan int) h.notificationListeners = append(h.notificationListeners, ch) @@ -467,4 +152,4 @@ func (h *HA) notifyNotificationListener(msg int) { ch <- msg }(c) } -} \ No newline at end of file +} diff --git a/ha_test.go b/ha_test.go deleted file mode 100644 index a076b513..00000000 --- a/ha_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package icingadb_ha - -import ( - "git.icinga.com/icingadb/icingadb-connection" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var testID, _ = uuid.FromBytes(make([]byte, 16)) -var testEnv = make([]byte, 20) - -func TestHA_setResponsibility(t *testing.T) { - responsibilities := [6]int32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover } - h := new(HA) - - previous := int32(0) - for _,r := range responsibilities { - assert.Equal(t, previous, h.setResponsibility(r), "Should be equal") - previous = r - } -} - -func TestHA_IsResponsible(t *testing.T) { - h := new(HA) - h.setResponsibility(resp_TakeoverSync) - assert.True(t, h.IsResponsible(), "Should be responsible") - h.setResponsibility(resp_TakeoverNoSync) - assert.False(t, h.IsResponsible(), "Should not be responsible") -} - -func TestHA_icinga2IsAlive(t *testing.T) { - h := new(HA) - h.icinga2MTime = time.Now().Unix() - 5 - assert.True(t, h.icinga2IsAlive(), "Should be alive") - h.icinga2MTime = h.icinga2MTime - 15 - assert.False(t, h.icinga2IsAlive(), "Should be dead") -} - -func TestHA_handleResponsibility(t *testing.T) { - h := new(HA) - h.ourEnv = &icingadb_connection.Environment{make([]byte, 20), "test"} - h.setResponsibility(resp_ReadyForTakeover) - var cont bool - var na int - - print(h.icinga2MTime) - - cont, na = h.handleResponsibility() - assert.True(t, cont) - assert.Equal(t, action_TryTakeover, na) - assert.Equal(t, resp_NotReadyForTakeover, h.getResponsibility()) - - //AWAKEN - h.icinga2MTime = time.Now().Unix() - cont, na = h.handleResponsibility() - assert.True(t, cont) - assert.Equal(t, resp_ReadyForTakeover, h.getResponsibility()) - - h.setResponsibility(resp_TakeoverSync) - cont, na = h.handleResponsibility() - assert.False(t, cont) - assert.Equal(t, action_DoTakeover, na) - assert.Equal(t, resp_TakeoverSync, h.getResponsibility()) - - //SLEEP - h.icinga2MTime = 0 - cont, na = h.handleResponsibility() - assert.True(t, cont) - assert.Equal(t, na, action_DoTakeover) -} - -func Test_cleanUpInstances(t *testing.T) { - var dbw, err = icingadb_connection.NewDBWrapper( - "module-dev:icinga0815!@tcp(127.0.0.1:3306)/icingadb?") - assert.NoError(t, err, "SQL error") - - _, err = dbw.SqlExec( - "insert into icingadb_instance", - `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, - testID[:], - testEnv, - time.Now().Unix() - 25, - "n", - ) - - assert.NoError(t, err, "SQL error") - - rows, err := dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) - - assert.Equal(t, 1, len(rows)) - - err = cleanUpInstances(dbw) - - assert.NoError(t, err, "Clean up failed") - - rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) - - assert.NoError(t, err, "SQL error") - - assert.Equal(t, 1, len(rows)) - - time.Sleep(time.Second * 5) - - err = cleanUpInstances(dbw) - - assert.NoError(t, err, "Clean up failed") - - rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) - - assert.NoError(t, err, "SQL error") - - assert.Equal(t, 0, len(rows)) -} \ No newline at end of file From cccb5cfeac35e069c90aa41d65d5c6c02f0fbf23 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Mar 2019 13:59:59 +0100 Subject: [PATCH 35/39] Fix statement and constructor --- ha.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ha.go b/ha.go index 29dc3ba1..3ba95307 100644 --- a/ha.go +++ b/ha.go @@ -22,7 +22,7 @@ type HA struct { notificationListeners []chan int } -func newHA(super *supervisor.Supervisor) (*HA, error) { +func NewHA(super *supervisor.Supervisor) (*HA, error) { var err error ho := HA{ super: super, @@ -51,8 +51,8 @@ func (h *HA) updateInstance() error { func (h *HA) insertInstance() error { _, err := h.super.Dbw.SqlExec("insert into icingadb_instance", - fmt.Sprintf("INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (%s, %s, %d, 'y')", - h.uid, h.super.EnvId, h.icinga2MTime)) + fmt.Sprintf("INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES ('%s', '%s', %d, 'y')", + h.uid[:], h.super.EnvId, h.icinga2MTime)) return err } From 84285bbbaa7ada1b244fb7d76befbf15f0b71c53 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 22 Mar 2019 14:00:17 +0100 Subject: [PATCH 36/39] Improve logging --- ha.go | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/ha.go b/ha.go index 3ba95307..878eef95 100644 --- a/ha.go +++ b/ha.go @@ -2,6 +2,7 @@ package icingadb_ha import ( "bytes" + "encoding/hex" "fmt" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/google/uuid" @@ -78,29 +79,43 @@ func (h *HA) Run(chEnv chan *Environment) { // Wait for first heartbeat env := <-chEnv if env == nil { - log.Fatal("Environment empty?!") + log.WithFields(log.Fields{ + "context": "HA", + }).Fatal("Received empty environment.") } h.super.EnvId = env.ID + haLogger := log.WithFields(log.Fields{ + "context": "HA", + "environment": hex.EncodeToString(h.super.EnvId), + "UUID": h.uid, + }) + haLogger.Info("Got initial environment.") + + // We have a new UUID with every restart, no use comparing them. _, beat, err := h.getInstance() if err != nil { - log.Fatal(err) + haLogger.Fatalf("Failed to fetch instance: %v", err) } if time.Now().Unix()-beat > 15 { + haLogger.Info("Taking over.") + // This means there was no instance row match, insert if beat == 0 { err = h.insertInstance() } else { err = h.updateInstance() } + if err != nil { - log.Fatal(err) + haLogger.Fatalf("Failed to insert/update instance: %v", err) } h.isActive = true h.notifyNotificationListener(Notify_StartSync) } else { + haLogger.Info("Other instance is active.") h.isActive = false } @@ -122,18 +137,23 @@ func (h *HA) Run(chEnv chan *Environment) { } else { they, beat, err := h.getInstance() if err != nil { - log.Fatal(err) + haLogger.Fatal("Failed to fetch instance: %v", err) } if they == h.uid { + haLogger.Debug("We are active.") if err := h.updateInstance(); err != nil { - log.Fatal(err) + haLogger.Fatalf("Failed to update instance: %v", err) } } else if h.icinga2MTime-beat > 15 { + haLogger.Info("Taking over.") h.isActive = true h.notifyNotificationListener(Notify_StartSync) + } else { + haLogger.Debug("Other instance is active.") } } case <-timerHA.C: + haLogger.Info("Icinga 2 sent no heartbeat for 15 seconds, pronouncing dead.") h.isActive = false h.notifyNotificationListener(Notify_StopSync) } From 0e58208b0b706f58c203857ce791efde1d85a827 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 13 May 2019 13:11:50 +0200 Subject: [PATCH 37/39] Improve error handling Minor fixes Fix log WIP Kein Plan --- ha.go | 52 ++++++++++++++++++++++++++++++++++------------- heartbeat_test.go | 2 +- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/ha.go b/ha.go index 878eef95..4e743e12 100644 --- a/ha.go +++ b/ha.go @@ -3,6 +3,7 @@ package icingadb_ha import ( "bytes" "encoding/hex" + "errors" "fmt" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/google/uuid" @@ -44,9 +45,16 @@ func (h *HA) AreWeActive() bool { return h.isActive } -func (h *HA) updateInstance() error { - _, err := h.super.Dbw.SqlExec("update icingadb_instance by environment", - fmt.Sprintf("UPDATE icingadb_instance SET heartbeat = %d", h.icinga2MTime)) +func (h *HA) updateOwnInstance() error { + _, err := h.super.Dbw.SqlExec("update icingadb_instance by id", + fmt.Sprintf("UPDATE icingadb_instance SET heartbeat = %d WHERE id = '%s'", h.icinga2MTime, h.uid[:])) + return err +} + +func (h *HA) takeOverInstance() error { + _, err := h.super.Dbw.SqlExec("update icingadb_instance by environment_id", + fmt.Sprintf("UPDATE icingadb_instance SET id = '%s', heartbeat = %d WHERE environment_id = '%s'", + h.uid[:], h.icinga2MTime, h.super.EnvId)) return err } @@ -81,21 +89,25 @@ func (h *HA) Run(chEnv chan *Environment) { if env == nil { log.WithFields(log.Fields{ "context": "HA", - }).Fatal("Received empty environment.") + }).Error("Received empty environment.") + h.super.ChErr <- errors.New("received empty environment") + return } h.super.EnvId = env.ID haLogger := log.WithFields(log.Fields{ - "context": "HA", + "context": "HA", "environment": hex.EncodeToString(h.super.EnvId), - "UUID": h.uid, + "UUID": h.uid, }) haLogger.Info("Got initial environment.") // We have a new UUID with every restart, no use comparing them. _, beat, err := h.getInstance() if err != nil { - haLogger.Fatalf("Failed to fetch instance: %v", err) + haLogger.Errorf("Failed to fetch instance: %v", err) + h.super.ChErr <- errors.New("failed to fetch instance") + return } if time.Now().Unix()-beat > 15 { @@ -105,11 +117,13 @@ func (h *HA) Run(chEnv chan *Environment) { if beat == 0 { err = h.insertInstance() } else { - err = h.updateInstance() + err = h.takeOverInstance() } if err != nil { - haLogger.Fatalf("Failed to insert/update instance: %v", err) + haLogger.Errorf("Failed to insert/update instance: %v", err) + h.super.ChErr <- errors.New("failed to insert/update instance") + return } h.isActive = true @@ -124,28 +138,38 @@ func (h *HA) Run(chEnv chan *Environment) { select { case env := <-chEnv: if bytes.Compare(env.ID, h.super.EnvId) != 0 { - log.Fatal("Received environment is not the one we expected. Panic.") + haLogger.Error("Received environment is not the one we expected. Panic.") + h.super.ChErr <- errors.New("received unexpected environment") } timerHA.Reset(time.Second * 15) previous := h.icinga2MTime h.icinga2HeartBeat() + if h.icinga2MTime-previous < 10 { if h.isActive { - err = h.updateInstance() + err = h.updateOwnInstance() } } else { they, beat, err := h.getInstance() if err != nil { - haLogger.Fatal("Failed to fetch instance: %v", err) + haLogger.Errorf("Failed to fetch instance: %v", err) + h.super.ChErr <- errors.New("failed to fetch instance") + return } if they == h.uid { haLogger.Debug("We are active.") - if err := h.updateInstance(); err != nil { - haLogger.Fatalf("Failed to update instance: %v", err) + if err := h.updateOwnInstance(); err != nil { + haLogger.Errorf("Failed to update instance: %v", err) + h.super.ChErr <- errors.New("failed to update instance") + return } } else if h.icinga2MTime-beat > 15 { haLogger.Info("Taking over.") + if err := h.takeOverInstance(); err != nil { + haLogger.Errorf("Failed to update instance: %v", err) + h.super.ChErr <- errors.New("failed to update instance") + } h.isActive = true h.notifyNotificationListener(Notify_StartSync) } else { diff --git a/heartbeat_test.go b/heartbeat_test.go index 616d24e8..402c0a9d 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -21,7 +21,7 @@ func TestIcingaEventsBroker(t *testing.T) { t.Fatal("This test needs a working Redis connection") } - chEnv := make(chan *icingadb_connection.Environment) + chEnv := make(chan *Environment) go func() { err := IcingaEventsBroker(rdb, chEnv) From aa591eb92d5fae97b266dce0c4974c7151995247 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Mon, 13 May 2019 13:27:38 +0200 Subject: [PATCH 38/39] HA should check other instances every second --- ha.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ha.go b/ha.go index 4e743e12..1fa3564d 100644 --- a/ha.go +++ b/ha.go @@ -146,10 +146,8 @@ func (h *HA) Run(chEnv chan *Environment) { previous := h.icinga2MTime h.icinga2HeartBeat() - if h.icinga2MTime-previous < 10 { - if h.isActive { - err = h.updateOwnInstance() - } + if h.icinga2MTime-previous < 10 && h.isActive { + err = h.updateOwnInstance() } else { they, beat, err := h.getInstance() if err != nil { From 5fbb6d916c2650347095d1c7d125caa8070d2c2a Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Mon, 13 May 2019 14:41:10 +0200 Subject: [PATCH 39/39] Prepare repository merge --- ha.go => ha/ha.go | 0 heartbeat.go => ha/heartbeat.go | 0 heartbeat_test.go => ha/heartbeat_test.go | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename ha.go => ha/ha.go (100%) rename heartbeat.go => ha/heartbeat.go (100%) rename heartbeat_test.go => ha/heartbeat_test.go (100%) diff --git a/ha.go b/ha/ha.go similarity index 100% rename from ha.go rename to ha/ha.go diff --git a/heartbeat.go b/ha/heartbeat.go similarity index 100% rename from heartbeat.go rename to ha/heartbeat.go diff --git a/heartbeat_test.go b/ha/heartbeat_test.go similarity index 100% rename from heartbeat_test.go rename to ha/heartbeat_test.go