From 05cc307112f46cbccfc277e1ff161e7b928ebdbd Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 8 Dec 2020 18:10:38 +0100 Subject: [PATCH 01/12] Don't retry individual SQL transaction commands If anything in a transaction fails, the transaction should be retried as a whole, just retrying some individual command makes no sense and breaks transaction semantics. For example, if a COMMIT fails in MySQL, it automatically rolls back the transaction and if you issue another COMMIT, you just commit an empty transaction. Remove the test case for commit as is basically tested that it showed the broken behavior described above. --- connection/mysql.go | 26 ++++++-------------------- connection/mysql_test.go | 30 ------------------------------ 2 files changed, 6 insertions(+), 50 deletions(-) diff --git a/connection/mysql.go b/connection/mysql.go index c91a1973..80fe3a30 100644 --- a/connection/mysql.go +++ b/connection/mysql.go @@ -300,12 +300,6 @@ func (dbw *DBWrapper) SqlCommit(tx DbTransaction, quiet bool) error { }).Debug("COMMIT transaction") } - if err != nil { - if dbw.isConnectionError(err) { - continue - } - } - return err } } @@ -335,12 +329,6 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error { err = tx.Rollback() } - if err != nil { - if dbw.isConnectionError(err) { - continue - } - } - return err } } @@ -382,7 +370,7 @@ func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryObserver prometh } // sqlExecInternal is a wrapper around sql.Exec() for auto-logging. -func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, sql string, quiet bool, args ...interface{}) (sql.Result, error) { +func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, query string, quiet bool, args ...interface{}) (sql.Result, error) { for { if !dbw.IsConnected() { dbw.WaitForConnection() @@ -394,7 +382,7 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prome benchmarc = utils.NewBenchmark() } - res, err := db.Exec(sql, args...) + res, err := db.Exec(query, args...) DbOperationsExec.Inc() if !quiet { @@ -408,12 +396,12 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prome "benchmark": benchmarc, "affected_rows": prettyPrintedRowsAffected{res}, "args": prettyPrintedArgs{args}, - "query": prettyPrintedSql{sql}, + "query": prettyPrintedSql{query}, }).Debug("Finished Exec") } if err != nil { - if dbw.isConnectionError(err) { + if _, isTx := db.(DbTransaction); !isTx && dbw.isConnectionError(err) { continue } } @@ -433,10 +421,8 @@ func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryObserve res, err := sqlTryFetchAll(db, queryObserver, query, quiet, args...) if err != nil { - if _, isDb := db.(*sql.DB); isDb { - if dbw.isConnectionError(err) { - continue - } + if _, isTx := db.(DbTransaction); !isTx && dbw.isConnectionError(err) { + continue } } diff --git a/connection/mysql_test.go b/connection/mysql_test.go index 32bc7f7b..da119bf5 100644 --- a/connection/mysql_test.go +++ b/connection/mysql_test.go @@ -107,36 +107,6 @@ func TestDBWrapper_CheckConnection(t *testing.T) { assert.Equal(t, uint32(11), atomic.LoadUint32(dbw.ConnectionLostCounterAtomic)) } -func TestDBWrapper_SqlCommit(t *testing.T) { - mockDb := new(DbMock) - dbw := NewTestDBW(mockDb) - mockTx := new(TransactionMock) - - mockTx.On("Commit").Return(errors.New("whoops")).Once() - mockTx.On("Commit").Return(nil).Once() - mockDb.On("Ping").Return(errors.New("whoops")).Once() - - var err error - done := make(chan bool) - - dbw.CompareAndSetConnected(true) - go func() { - err = dbw.SqlCommit(mockTx, false) - done <- true - }() - - time.Sleep(time.Millisecond * 50) - - dbw.CompareAndSetConnected(true) - dbw.ConnectionUpCondition.Broadcast() - - <-done - - assert.NoError(t, err) - mockTx.AssertExpectations(t) - mockDb.AssertExpectations(t) -} - func TestDBWrapper_SqlBegin(t *testing.T) { mockDb := new(DbMock) dbw := NewTestDBW(mockDb) From acc10490e9e2281fcfa93cea0332bad4cbe1b6e8 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 16 Dec 2020 16:38:09 +0100 Subject: [PATCH 02/12] Provide generic HA state change notifications The HA code generated special notifications for the configsync package. This commit instead provides a generic notification channel in the HA package and moves all the special handling to the configsync package which now generates its special notifications based on the generic ones. --- configobject/configsync/configsync.go | 5 +- configobject/configsync/configsync_test.go | 7 +- configobject/configsync/ha.go | 117 +++++++++++++++++ configobject/configsync/ha_test.go | 104 +++++++++++++++ ha/ha.go | 139 +++++++++------------ ha/ha_test.go | 86 +------------ ha/state.go | 28 +++++ main.go | 7 +- 8 files changed, 324 insertions(+), 169 deletions(-) create mode 100644 configobject/configsync/ha.go create mode 100644 configobject/configsync/ha_test.go create mode 100644 ha/state.go diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index 59996fb7..7cbabdf5 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/Icinga/icingadb/configobject" "github.com/Icinga/icingadb/connection" - "github.com/Icinga/icingadb/ha" "github.com/Icinga/icingadb/jsondecoder" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" @@ -60,14 +59,14 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co for msg := range chHA { switch msg { // Icinga 2 probably restarted or died, stop operations and tell all workers to shut down. - case ha.Notify_StopSync: + case Notify_StopSync: if done != nil { log.Debugf("%s: Lost responsibility", objectInformation.ObjectType) close(done) done = nil } // Starts up the whole sync process. - case ha.Notify_StartSync: + case Notify_StartSync: if done != nil { continue } diff --git a/configobject/configsync/configsync_test.go b/configobject/configsync/configsync_test.go index 1276abaf..e375a6bc 100644 --- a/configobject/configsync/configsync_test.go +++ b/configobject/configsync/configsync_test.go @@ -7,7 +7,6 @@ import ( "github.com/Icinga/icingadb/configobject" "github.com/Icinga/icingadb/configobject/objecttypes/host" "github.com/Icinga/icingadb/connection" - "github.com/Icinga/icingadb/ha" "github.com/Icinga/icingadb/jsondecoder" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" @@ -66,7 +65,7 @@ func TestOperator_InsertHost(t *testing.T) { testbackends.RedisTestClient.HSet("icinga:checksum:host", "a9ef44eb69fda8fbc32bee33322b6518057f559f", "{\"checksum\":\"b6e87de3d4f31b3d4d35466171f4088693b46071\"}") for _, ch := range chs { - ch <- ha.Notify_StartSync + ch <- Notify_StartSync } assert.Eventually(t, func() bool { @@ -142,7 +141,7 @@ func TestOperator_DeleteHost(t *testing.T) { require.NoError(t, err) for _, ch := range chs { - ch <- ha.Notify_StartSync + ch <- Notify_StartSync } assert.Eventually(t, func() bool { @@ -214,7 +213,7 @@ func TestOperator_UpdateHost(t *testing.T) { require.NoError(t, err) for _, ch := range chs { - ch <- ha.Notify_StartSync + ch <- Notify_StartSync } assert.Eventually(t, func() bool { diff --git a/configobject/configsync/ha.go b/configobject/configsync/ha.go new file mode 100644 index 00000000..df8d5125 --- /dev/null +++ b/configobject/configsync/ha.go @@ -0,0 +1,117 @@ +package configsync + +import ( + "github.com/Icinga/icingadb/ha" + "github.com/Icinga/icingadb/supervisor" + "github.com/go-redis/redis/v7" + log "github.com/sirupsen/logrus" + "sync" + "time" +) + +const ( + Notify_StartSync = iota + Notify_StopSync +) + +type ConfigSyncHA struct { + super *supervisor.Supervisor + done chan struct{} + chHA <-chan ha.State + notificationListeners map[string][]chan int + notificationListenersMutex sync.Mutex + lastEventId string + haIsActive bool +} + +func NewConfigSyncHA(super *supervisor.Supervisor, chHA <-chan ha.State) *ConfigSyncHA { + return &ConfigSyncHA{ + super: super, + done: make(chan struct{}), + chHA: chHA, + notificationListeners: map[string][]chan int{}, + lastEventId: "0-0", + } +} + +func (h *ConfigSyncHA) Start() { + go h.run() +} + +func (h *ConfigSyncHA) run() { + every1s := time.NewTicker(time.Second) + +loop: + for { + select { + case <-h.done: + log.Info("received done signal, shutting down") + break loop + case <-every1s.C: + h.runEventListener() + } + } +} + +func (h *ConfigSyncHA) Stop() { + close(h.done) +} + +func (h *ConfigSyncHA) runEventListener() { + select { + case newState := <-h.chHA: + h.haIsActive = newState == ha.StateActive + if !h.haIsActive { + h.lastEventId = "0-0" + h.notifyNotificationListener("*", Notify_StopSync) + } + default: // don't block if there is no change + } + + if !h.haIsActive { + return + } + + result := h.super.Rdbw.XRead(&redis.XReadArgs{Block: -1, Streams: []string{"icinga:dump", h.lastEventId}}) + streams, err := result.Result() + if err != nil { + if err.Error() != "redis: nil" { + h.super.ChErr <- err + } + return + } + + events := streams[0].Messages + if len(events) == 0 { + return + } + + for _, event := range events { + h.lastEventId = event.ID + values := event.Values + + if values["state"] == "done" { + h.notifyNotificationListener(values["type"].(string), Notify_StartSync) + } else { + h.notifyNotificationListener(values["type"].(string), Notify_StopSync) + } + } +} + +func (h *ConfigSyncHA) RegisterNotificationListener(listenerType string) chan int { + ch := make(chan int, 10) + h.notificationListenersMutex.Lock() + h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch) + h.notificationListenersMutex.Unlock() + return ch +} + +func (h *ConfigSyncHA) notifyNotificationListener(listenerType string, msg int) { + for t, chs := range h.notificationListeners { + if t == listenerType || listenerType == "*" { + for _, c := range chs { + c <- msg + } + } + } +} diff --git a/configobject/configsync/ha_test.go b/configobject/configsync/ha_test.go new file mode 100644 index 00000000..2ace0927 --- /dev/null +++ b/configobject/configsync/ha_test.go @@ -0,0 +1,104 @@ +package configsync + +import ( + "github.com/Icinga/icingadb/config/testbackends" + "github.com/Icinga/icingadb/connection" + "github.com/Icinga/icingadb/ha" + "github.com/Icinga/icingadb/supervisor" + "github.com/go-redis/redis/v7" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func GetSuper() *supervisor.Supervisor { + redisConn := connection.NewRDBWrapper(testbackends.RedisTestAddr, "", 64) + + return &supervisor.Supervisor{ + ChErr: make(chan error), + Rdbw: redisConn, + } +} + +func TestHA_NotificationListeners(t *testing.T) { + super := GetSuper() + chHA := make(chan ha.State) + haInst := NewConfigSyncHA(super, chHA) + + chHost := haInst.RegisterNotificationListener("host") + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + assert.Equal(t, Notify_StartSync, <-chHost) + wg.Done() + }() + + haInst.notifyNotificationListener("host", Notify_StartSync) + wg.Wait() + + chService := haInst.RegisterNotificationListener("service") + wg.Add(1) + + go func() { + assert.Equal(t, Notify_StartSync, <-chService) + wg.Done() + }() + + haInst.notifyNotificationListener("service", Notify_StartSync) + wg.Wait() + + wg.Add(1) + + go func() { + assert.Equal(t, Notify_StartSync, <-chService) + assert.Equal(t, Notify_StartSync, <-chHost) + wg.Done() + }() + + haInst.notifyNotificationListener("*", Notify_StartSync) + wg.Wait() +} + +func TestHA_EventListener(t *testing.T) { + super := GetSuper() + chHA := make(chan ha.State) + haInst := NewConfigSyncHA(super, chHA) + + haInst.Start() + defer haInst.Stop() + + chHA <- ha.StateActive + + testbackends.RedisTestClient.Del("icinga:dump") + + chHost := haInst.RegisterNotificationListener("host") + chService := haInst.RegisterNotificationListener("service") + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + assert.Equal(t, Notify_StartSync, <-chHost) + assert.Equal(t, Notify_StopSync, <-chHost) + assert.Equal(t, Notify_StartSync, <-chHost) + assert.Equal(t, Notify_StopSync, <-chHost) + wg.Done() + }() + + go func() { + assert.Equal(t, Notify_StartSync, <-chService) + assert.Equal(t, Notify_StopSync, <-chService) + assert.Equal(t, Notify_StartSync, <-chService) + wg.Done() + }() + + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}}) + + wg.Wait() +} diff --git a/ha/ha.go b/ha/ha.go index 3a15025d..e2d4b759 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -10,7 +10,6 @@ import ( "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" - "github.com/go-redis/redis/v7" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -18,30 +17,21 @@ import ( "time" ) -const ( - Notify_StartSync = iota - Notify_StopSync -) - type HA struct { - isActive bool - lastHeartbeat int64 - uid uuid.UUID - super *supervisor.Supervisor - notificationListeners map[string][]chan int - notificationListenersMutex sync.Mutex - lastEventId string - logger *log.Entry - heartbeatTimer *time.Timer + state State + stateChangeListeners []chan State + stateChangeListenersMutex sync.Mutex + lastHeartbeat int64 + uid uuid.UUID + super *supervisor.Supervisor + logger *log.Entry + heartbeatTimer *time.Timer } func NewHA(super *supervisor.Supervisor) (*HA, error) { var err error ho := HA{ - super: super, - notificationListeners: make(map[string][]chan int), - notificationListenersMutex: sync.Mutex{}, - lastEventId: "0-0", + super: super, } if ho.uid, err = uuid.NewRandom(); err != nil { @@ -65,6 +55,33 @@ var mysqlObservers = struct { connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat from icingadb_instance where environment_id = ourEnvID"), } +func (h *HA) setState(state State) { + switch state { + // valid new states (no action needed) + case StateActive: + case StateOtherActive: + case StateAllInactive: + case StateInactiveUnkown: + + // invalid arguments + case StateInit: + log.Fatal("Must not set HA state to StateInit") + default: + log.Fatalf("Trying to change to invalid HA state %d", state) + } + + if state != h.state { + if h.state != StateInit { + log.Infof("Changing HA state to %s (was %s)", state.String(), h.state.String()) + } else { + log.Infof("Changing HA state to %s", state.String()) + } + + h.state = state + h.notifyStateChangeListeners(state) + } +} + func (h *HA) updateOwnInstance(env *Environment) error { _, err := h.super.Dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, @@ -231,11 +248,10 @@ func (h *HA) checkResponsibility(env *Environment) { return } - h.isActive = true + h.setState(StateActive) } else { h.logger.Info("Other instance is active.") - h.isActive = false - h.lastEventId = "0-0" + h.setState(StateOtherActive) } } @@ -252,7 +268,7 @@ func (h *HA) runHA(chEnv chan *Environment) { previous := h.lastHeartbeat h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) - if h.lastHeartbeat-previous < 10*1000 && h.isActive { + if h.lastHeartbeat-previous < 10*1000 && h.state == StateActive { err := h.updateOwnInstance(env) if err != nil { @@ -269,9 +285,9 @@ func (h *HA) runHA(chEnv chan *Environment) { } if they == h.uid { h.logger.Debug("We are active.") - if !h.isActive { + if h.state != StateActive { h.logger.Info("Icinga 2 sent heartbeat. Starting sync") - h.isActive = true + h.setState(StateActive) } if err := h.updateOwnInstance(env); err != nil { @@ -285,73 +301,40 @@ func (h *HA) runHA(chEnv chan *Environment) { h.logger.Errorf("Failed to update instance: %v", err) h.super.ChErr <- errors.New("failed to update instance") } - h.isActive = true + h.setState(StateActive) } else { h.logger.Debug("Other instance is active.") } } case <-h.heartbeatTimer.C: h.logger.Info("Icinga 2 sent no heartbeat for 15 seconds. Pausing sync") - h.isActive = false - h.lastEventId = "0-0" - h.notifyNotificationListener("*", Notify_StopSync) + h.setState(StateAllInactive) } } -func (h *HA) StartEventListener() { - every1s := time.NewTicker(time.Second) +func (h *HA) RegisterStateChangeListener() <-chan State { + // The channel has a buffer of size so that it can hold the most recent state. If it is full when we try to write, + // we chan just drain it as the element it contains is outdated anyways. + ch := make(chan State, 1) - for { - <-every1s.C - h.runEventListener() - } -} + h.stateChangeListenersMutex.Lock() + defer h.stateChangeListenersMutex.Unlock() -func (h *HA) runEventListener() { - if !h.isActive { - return - } - - result := h.super.Rdbw.XRead(&redis.XReadArgs{Block: -1, Streams: []string{"icinga:dump", h.lastEventId}}) - streams, err := result.Result() - if err != nil { - if err.Error() != "redis: nil" { - h.super.ChErr <- err - } - return - } - - events := streams[0].Messages - if len(events) == 0 { - return - } - - for _, event := range events { - h.lastEventId = event.ID - values := event.Values - - if values["state"] == "done" { - h.notifyNotificationListener(values["type"].(string), Notify_StartSync) - } else { - h.notifyNotificationListener(values["type"].(string), Notify_StopSync) - } - } -} - -func (h *HA) RegisterNotificationListener(listenerType string) chan int { - ch := make(chan int, 10) - h.notificationListenersMutex.Lock() - h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch) - h.notificationListenersMutex.Unlock() + h.stateChangeListeners = append(h.stateChangeListeners, ch) return ch } -func (h *HA) notifyNotificationListener(listenerType string, msg int) { - for t, chs := range h.notificationListeners { - if t == listenerType || listenerType == "*" { - for _, c := range chs { - c <- msg - } +func (h *HA) notifyStateChangeListeners(state State) { + h.stateChangeListenersMutex.Lock() + defer h.stateChangeListenersMutex.Unlock() + + for _, ch := range h.stateChangeListeners { + // drain the channel + select { + case <-ch: + default: } + + ch <- state } } diff --git a/ha/ha_test.go b/ha/ha_test.go index 6961322c..780e2b9d 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -8,7 +8,6 @@ import ( "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" - "github.com/go-redis/redis/v7" "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -77,7 +76,7 @@ func TestHA_checkResponsibility(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) ha.checkResponsibility(&Environment{}) - assert.Equal(t, true, ha.isActive, "HA should be responsible, if no other instance is active") + assert.Equal(t, StateActive, ha.state, "HA should be responsible, if no other instance is active") _, err := ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") require.NoError(t, err, "This test needs a working MySQL connection!") @@ -90,10 +89,10 @@ func TestHA_checkResponsibility(t *testing.T) { require.NoError(t, err, "This test needs a working MySQL connection!") - ha.isActive = false + ha.state = StateAllInactive ha.checkResponsibility(&Environment{}) - assert.Equal(t, true, ha.isActive, "HA should be responsible, if another instance was inactive for a long time") + assert.Equal(t, StateActive, ha.state, "HA should be responsible, if another instance was inactive for a long time") _, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") require.NoError(t, err, "This test needs a working MySQL connection!") @@ -104,10 +103,10 @@ func TestHA_checkResponsibility(t *testing.T) { ha.uid[:], ha.super.EnvId, utils.TimeToMillisecs(time.Now()), ) - ha.isActive = false + ha.state = StateAllInactive ha.checkResponsibility(&Environment{}) - assert.Equal(t, false, ha.isActive, "HA should not be responsible, if another instance is active") + assert.NotEqual(t, StateActive, ha.state, "HA should not be responsible, if another instance is active") } func TestHA_waitForEnvironment(t *testing.T) { @@ -203,78 +202,3 @@ func TestHA_runHA(t *testing.T) { wg.Wait() } - -func TestHA_NotificationListeners(t *testing.T) { - ha := createTestingHA(t, testbackends.RedisTestAddr) - chHost := ha.RegisterNotificationListener("host") - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - assert.Equal(t, Notify_StartSync, <-chHost) - wg.Done() - }() - - ha.notifyNotificationListener("host", Notify_StartSync) - wg.Wait() - - chService := ha.RegisterNotificationListener("service") - wg.Add(1) - - go func() { - assert.Equal(t, Notify_StartSync, <-chService) - wg.Done() - }() - - ha.notifyNotificationListener("service", Notify_StartSync) - wg.Wait() - - wg.Add(1) - - go func() { - assert.Equal(t, Notify_StartSync, <-chService) - assert.Equal(t, Notify_StartSync, <-chHost) - wg.Done() - }() - - ha.notifyNotificationListener("*", Notify_StartSync) - wg.Wait() -} - -func TestHA_EventListener(t *testing.T) { - ha := createTestingHA(t, testbackends.RedisTestAddr) - ha.isActive = true - go ha.StartEventListener() - - testbackends.RedisTestClient.Del("icinga:dump") - - chHost := ha.RegisterNotificationListener("host") - chService := ha.RegisterNotificationListener("service") - - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - assert.Equal(t, Notify_StartSync, <-chHost) - assert.Equal(t, Notify_StopSync, <-chHost) - assert.Equal(t, Notify_StartSync, <-chHost) - assert.Equal(t, Notify_StopSync, <-chHost) - wg.Done() - }() - - go func() { - assert.Equal(t, Notify_StartSync, <-chService) - assert.Equal(t, Notify_StopSync, <-chService) - assert.Equal(t, Notify_StartSync, <-chService) - wg.Done() - }() - - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}}) - - wg.Wait() -} diff --git a/ha/state.go b/ha/state.go new file mode 100644 index 00000000..3c363627 --- /dev/null +++ b/ha/state.go @@ -0,0 +1,28 @@ +package ha + +type State uint8 + +const ( + StateInit State = iota // Initial state when starting + StateActive // This instance is active + StateOtherActive // This instance is inactive but there is another one that is active + StateAllInactive // All known instances are inactive (i.e. none receives Icinga 2 heartbeats) + StateInactiveUnkown // This instance is inactive but does not known about the state of other instances +) + +func (s State) String() string { + switch s { + case StateInit: + return "init" + case StateActive: + return "active" + case StateOtherActive: + return "inactive (other instance active)" + case StateAllInactive: + return "inactive (all instances inactive)" + case StateInactiveUnkown: + return "inactive (other instances unkown)" + default: + return "(invalid)" + } +} diff --git a/main.go b/main.go index d3242085..a58f95aa 100644 --- a/main.go +++ b/main.go @@ -145,8 +145,6 @@ func main() { history.StartHistoryWorkers(&super) - go haInstance.StartEventListener() - if metricsInfo.Host != "" { go prometheus.HandleHttp("["+metricsInfo.Host+"]:"+metricsInfo.Port, super.ChErr) } @@ -228,9 +226,12 @@ func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) { &hoststate.ObjectInformation, } + configSyncHA := configsync.NewConfigSyncHA(super, haInstance.RegisterStateChangeListener()) + configSyncHA.Start() + for _, objectInformation := range objectTypes { go func(information *configobject.ObjectInformation) { - super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(information.NotificationListenerType), information) + super.ChErr <- configsync.Operator(super, configSyncHA.RegisterNotificationListener(information.NotificationListenerType), information) }(objectInformation) } } From b09e18825ae8df4a371dc62a3aa6678ad3ff7cb2 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Mon, 21 Dec 2020 09:31:32 +0100 Subject: [PATCH 03/12] Add test demonstrating that multiple instances may take over at the same time Example output: time="2020-12-21T09:22:29+01:00" level=info msg="Taking over." UUID=bbc39673-7281-5064-a71e-91c25dbc6643 context=HA-Testing time="2020-12-21T09:22:29+01:00" level=info msg="Taking over." UUID=dddedcc5-507e-56d6-adb4-9c6451f6aeb7 context=HA-Testing time="2020-12-21T09:22:29+01:00" level=info msg="Changing HA state to active" time="2020-12-21T09:22:29+01:00" level=info msg="Changing HA state to active" ha_test.go:291: Error Trace: ha_test.go:291 Error: Not equal: expected: 1 actual : 2 Test: TestHA_ConcurrentCheckResponsibility Messages: exactly 1 instance must be active after checkResponsibility() but 2 are active --- FAIL: TestHA_ConcurrentCheckResponsibility (0.02s) --- ha/ha_test.go | 94 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 3 deletions(-) diff --git a/ha/ha_test.go b/ha/ha_test.go index 780e2b9d..71faa3e5 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "strconv" "sync" "testing" "time" @@ -21,9 +22,7 @@ func createTestingHA(t *testing.T, redisAddr string) *HA { redisConn := connection.NewRDBWrapper(redisAddr, "", 64) mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) - if err != nil { - assert.Fail(t, "This test needs a working MySQL connection!") - } + require.NoError(t, err, "This test needs a working MySQL connection!") super := supervisor.Supervisor{ ChErr: make(chan error), @@ -49,6 +48,44 @@ func createTestingHA(t *testing.T, redisAddr string) *HA { return ha } +func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ([]*HA, <-chan error) { + redisConn := connection.NewRDBWrapper(redisAddr, "", 64) + + mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) + require.NoError(t, err, "This test needs a working MySQL connection!") + + _, err = mysqlConn.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") + require.NoError(t, err, "This test needs a working MySQL connection!") + + instances := make([]*HA, numInstances) + chErr := make(chan error) + + for i := 0; i < numInstances; i++ { + + super := supervisor.Supervisor{ + ChErr: chErr, + Rdbw: redisConn, + Dbw: mysqlConn, + } + + ha, _ := NewHA(&super) + + hash := sha1.New() + hash.Write([]byte("derp")) + ha.super.EnvId = hash.Sum(nil) + ha.uid = uuid.NewSHA1(uuid.MustParse("551bc748-94b2-4d27-b6a4-15c52aecfe85"), []byte(strconv.Itoa(i))) + + ha.logger = log.WithFields(log.Fields{ + "context": "HA-Testing", + "UUID": ha.uid, + }) + + instances[i] = ha + } + + return instances, chErr +} + var mysqlTestObserver = connection.DbIoSeconds.WithLabelValues("mysql", "test") func TestHA_InsertInstance(t *testing.T) { @@ -202,3 +239,54 @@ func TestHA_runHA(t *testing.T) { wg.Wait() } + +func TestHA_ConcurrentCheckResponsibility(t *testing.T) { + numAttempts := 10 + numConcurrentTakeovers := 2 + failed := false + + for attempt := 0; !failed && attempt < numAttempts; attempt++ { + wg := sync.WaitGroup{} + wg.Add(numConcurrentTakeovers) + + haInstances, chErr := createTestingMultipleHA(t, testbackends.RedisTestAddr, numConcurrentTakeovers) + for _, ha := range haInstances { + ha := ha + go func() { + defer wg.Done() + ha.checkResponsibility(&Environment{}) + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + loop: + for { + select { + case err := <-chErr: + assert.NoError(t, err, "checkResponsibility() should return no error") + if err != nil { + failed = true + } + case <-done: + break loop + } + } + + numActive := 0 + for _, ha := range haInstances { + if ha.state == StateActive { + numActive++ + } + } + + assert.Equal(t, 1, numActive, "exactly 1 instance must be active after checkResponsibility() but %d are active", numActive) + if numActive != 1 { + failed = true + } + } +} From c6f8360052bc71746cc208efe6eaf5ce9258abf1 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Mon, 21 Dec 2020 10:09:12 +0100 Subject: [PATCH 04/12] Rewrite HA logic to use SQL serializable transactions The old logic didn't properly handle multiple nodes trying to become active at the same time. --- ha/ha.go | 306 +++++++++++++++++++++++--------------------------- ha/ha_test.go | 192 +++++++++++++++++++------------ 2 files changed, 262 insertions(+), 236 deletions(-) diff --git a/ha/ha.go b/ha/ha.go index e2d4b759..7aee070d 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -25,9 +25,16 @@ type HA struct { uid uuid.UUID super *supervisor.Supervisor logger *log.Entry - heartbeatTimer *time.Timer } +const ( + // We consider heartbeats valid for 10 seconds + heartbeatValidMillisecs = 10 * 1000 + + // We consider the heartbeat of another instance to be expired 5 seconds after its validity ended + heartbeatTimeoutMillisecs = heartbeatValidMillisecs + 5*1000 +) + func NewHA(super *supervisor.Supervisor) (*HA, error) { var err error ho := HA{ @@ -42,17 +49,19 @@ func NewHA(super *supervisor.Supervisor) (*HA, error) { } var mysqlObservers = struct { - updateIcingadbInstanceById prometheus.Observer - updateIcingadbInstanceByEnvironmentId prometheus.Observer - insertIntoIcingadbInstance prometheus.Observer - insertIntoEnvironment prometheus.Observer - selectIdHeartbeatFromIcingadbInstanceByEnvironmentId prometheus.Observer + updateIcingadbInstanceById prometheus.Observer + updateIcingadbInstanceByEnvironmentId prometheus.Observer + insertIntoIcingadbInstance prometheus.Observer + insertIntoEnvironment prometheus.Observer + selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId prometheus.Observer + selectHeartbeatResponsibleFromIcingadbInstanceById prometheus.Observer }{ connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"), connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"), connection.DbIoSeconds.WithLabelValues("mysql", "insert into icingadb_instance"), connection.DbIoSeconds.WithLabelValues("mysql", "insert into environment"), - connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat from icingadb_instance where environment_id = ourEnvID"), + connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat, responsible from icingadb_instance where environment_id = ourEnvID"), + connection.DbIoSeconds.WithLabelValues("mysql", "select heartbeat, responsible from icingadb_instance by id"), } func (h *HA) setState(state State) { @@ -82,103 +91,87 @@ func (h *HA) setState(state State) { } } -func (h *HA) updateOwnInstance(env *Environment) error { - _, err := h.super.Dbw.SqlExec( - mysqlObservers.insertIntoIcingadbInstance, - "REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, heartbeat, responsible,"+ +func (h *HA) upsertInstance(tx connection.DbTransaction, env *Environment, isActive bool) error { + if isActive { + // If we are active or become active, ensure that no other instance has the active flag set. + _, err := h.super.Dbw.SqlExecTx(tx, mysqlObservers.updateIcingadbInstanceByEnvironmentId, + "UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND responsible = ?", + utils.Bool[false], h.super.EnvId, utils.Bool[true]) + if err != nil { + return err + } + } + + _, err := h.super.Dbw.SqlExecTx( + tx, mysqlObservers.insertIntoIcingadbInstance, + "REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+ " icinga2_active_service_checks_enabled, icinga2_active_host_checks_enabled,"+ " icinga2_event_handlers_enabled, icinga2_flap_detection_enabled,"+ - " icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, 'y', ?, ?, ?, ?, ?, ?, ?, ?)", - h.uid[:], - h.super.EnvId, - env.Icinga2.EndpointId, - h.lastHeartbeat, - env.Icinga2.Version, - int64(env.Icinga2.ProgramStart*1000), - utils.Bool[env.Icinga2.NotificationsEnabled], - utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], - utils.Bool[env.Icinga2.ActiveHostChecksEnabled], - utils.Bool[env.Icinga2.EventHandlersEnabled], - utils.Bool[env.Icinga2.FlapDetectionEnabled], - utils.Bool[env.Icinga2.PerformanceDataEnabled], + " icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + h.uid[:], // id + h.super.EnvId, // environment_id + env.Icinga2.EndpointId, // endpoint_id + utils.Bool[isActive], // responsible + h.lastHeartbeat, // heartbeat + env.Icinga2.Version, // icinga2_version + int64(env.Icinga2.ProgramStart*1000), // icinga2_start_time + utils.Bool[env.Icinga2.NotificationsEnabled], // icinga2_notifications_enabled + utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], // icinga2_active_service_checks_enabled + utils.Bool[env.Icinga2.ActiveHostChecksEnabled], // icinga2_active_host_checks_enabled + utils.Bool[env.Icinga2.EventHandlersEnabled], // icinga2_event_handlers_enabled + utils.Bool[env.Icinga2.FlapDetectionEnabled], // icinga2_flap_detection_enabled + utils.Bool[env.Icinga2.PerformanceDataEnabled], // icinga2_performance_data_enabled ) return err } -func (h *HA) takeOverInstance(env *Environment) error { - _, err := h.super.Dbw.SqlExec( - mysqlObservers.updateIcingadbInstanceByEnvironmentId, - "UPDATE icingadb_instance SET id = ?, endpoint_id = ?, heartbeat = ?,"+ - " icinga2_version = ?, icinga2_start_time = ?, icinga2_notifications_enabled = ?,"+ - " icinga2_active_service_checks_enabled = ?, icinga2_active_host_checks_enabled = ?,"+ - " icinga2_event_handlers_enabled = ?, icinga2_flap_detection_enabled = ?,"+ - " icinga2_performance_data_enabled = ? WHERE environment_id = ?", - h.uid[:], - env.Icinga2.EndpointId, - h.lastHeartbeat, - env.Icinga2.Version, - int64(env.Icinga2.ProgramStart*1000), - utils.Bool[env.Icinga2.NotificationsEnabled], - utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], - utils.Bool[env.Icinga2.ActiveHostChecksEnabled], - utils.Bool[env.Icinga2.EventHandlersEnabled], - utils.Bool[env.Icinga2.FlapDetectionEnabled], - utils.Bool[env.Icinga2.PerformanceDataEnabled], - h.super.EnvId, +func (h *HA) getActiveInstance(tx connection.DbTransaction) (bool, uuid.UUID, error) { + rows, err := h.super.Dbw.SqlFetchAllTx( + tx, mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId, + "SELECT id, heartbeat FROM icingadb_instance"+ + " WHERE environment_id = ? AND responsible = ? AND heartbeat > ?", + h.super.EnvId, utils.Bool[true], utils.TimeToMillisecs(time.Now())-heartbeatTimeoutMillisecs, ) - return err -} - -func (h *HA) insertInstance(env *Environment) error { - _, err := h.super.Dbw.SqlExec( - mysqlObservers.insertIntoIcingadbInstance, - "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, heartbeat, responsible,"+ - " icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+ - " icinga2_active_service_checks_enabled, icinga2_active_host_checks_enabled,"+ - " icinga2_event_handlers_enabled, icinga2_flap_detection_enabled,"+ - " icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, 'y', ?, ?, ?, ?, ?, ?, ?, ?)", - h.uid[:], - h.super.EnvId, - env.Icinga2.EndpointId, - h.lastHeartbeat, - env.Icinga2.Version, - int64(env.Icinga2.ProgramStart*1000), - utils.Bool[env.Icinga2.NotificationsEnabled], - utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], - utils.Bool[env.Icinga2.ActiveHostChecksEnabled], - utils.Bool[env.Icinga2.EventHandlersEnabled], - utils.Bool[env.Icinga2.FlapDetectionEnabled], - utils.Bool[env.Icinga2.PerformanceDataEnabled], - ) - return err -} - -func (h *HA) getInstance() (bool, uuid.UUID, int64, error) { - rows, err := h.super.Dbw.SqlFetchAll( - mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId, - "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", - h.super.EnvId, - ) - if err != nil { - return false, uuid.UUID{}, 0, err + return false, uuid.UUID{}, err } + if len(rows) > 1 { + return false, uuid.UUID{}, errors.New("there is more than one active IcingaDB instance") + } + if len(rows) == 0 { - return false, uuid.UUID{}, 0, nil + // No active instance according to database. + return false, uuid.UUID{}, nil } - var theirUUID uuid.UUID - copy(theirUUID[:], rows[0][0].([]byte)) + idBytes := rows[0][0].([]byte) + icinga2Heartbeat := rows[0][1].(int64) - return true, theirUUID, rows[0][1].(int64), nil + activeId, err := uuid.FromBytes(idBytes) + if err != nil { + return false, uuid.UUID{}, fmt.Errorf("invalid active UUID in database: %s", err.Error()) + } + + icinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - icinga2Heartbeat + + if activeId == h.uid && icinga2HeartbeatAge > heartbeatValidMillisecs { + // Our heartbeat is too old to be considered valid, no longer consider ourselves to be active. + return false, uuid.UUID{}, nil + } else if activeId != h.uid && icinga2HeartbeatAge > heartbeatTimeoutMillisecs { + // Their heartbeat is old enough to be considered timed out, no longer consider them to be active. + return false, uuid.UUID{}, nil + } + + return true, activeId, nil } func (h *HA) StartHA(chEnv chan *Environment) { env := h.waitForEnvironment(chEnv) + h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) err := h.setAndInsertEnvironment(env) if err != nil { - h.super.ChErr <- fmt.Errorf("Could not insert environment into MySQL: %s", err.Error()) + h.super.ChErr <- fmt.Errorf("could not insert environment into MySQL: %s", err.Error()) } h.logger = log.WithFields(log.Fields{ @@ -189,13 +182,7 @@ func (h *HA) StartHA(chEnv chan *Environment) { h.logger.Info("Got initial environment.") - h.checkResponsibility(env) - - h.heartbeatTimer = time.NewTimer(time.Second * 15) - - for { - h.runHA(chEnv) - } + h.runHA(chEnv, env) } func (h *HA) waitForEnvironment(chEnv chan *Environment) *Environment { @@ -225,90 +212,81 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error { } func (h *HA) checkResponsibility(env *Environment) { - found, _, beat, err := h.getInstance() - if err != nil { - h.logger.Errorf("Failed to fetch instance: %v", err) - h.super.ChErr <- errors.New("failed to fetch instance") - return - } - - if utils.TimeToMillisecs(time.Now())-beat > 15*1000 { - h.logger.Info("Taking over.") - - // This means there was no instance row match, insert - if !found { - err = h.insertInstance(env) - } else { - err = h.takeOverInstance(env) - } - + var newState State + err := h.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + foundActive, activeId, err := h.getActiveInstance(tx) if err != nil { - h.logger.Errorf("Failed to insert/update instance: %v", err) - h.super.ChErr <- errors.New("failed to insert/update instance") - return + return err } - h.setState(StateActive) - } else { - h.logger.Info("Other instance is active.") - h.setState(StateOtherActive) + lastIcinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - h.lastHeartbeat + lastIcinga2HeartbeatValid := lastIcinga2HeartbeatAge < heartbeatValidMillisecs + + if foundActive { + if activeId == h.uid { + if lastIcinga2HeartbeatValid { + // We are active according to the DB and have a valid heartbeat, keep it that way. + newState = StateActive + } else { + // We are active according to the DB but our heartbeat from Icinga 2 is no longer valid. + // Give up active state so that another instance has a chance to take over. + newState = StateAllInactive + } + } else { + // Some other instance is active, remain passive + newState = StateOtherActive + } + } else { + // No instance is currently active. Try take over, but only + // if we are actively receiving heartbeats from Icinga 2. + if lastIcinga2HeartbeatValid { + h.logger.Info("No active instance, trying to take over.") + newState = StateActive + } else { + newState = StateAllInactive + } + } + + err = h.upsertInstance(tx, env, newState == StateActive) + if err != nil { + return err + } + + return nil + }) + if err != nil { + // Transaction failed, we are not sure about the current global state. + // In any case, we ensure that we are no longer active. + h.super.ChErr <- errors.New("HA heartbeat failed") + h.logger.Errorf("HA heartbeat failed: %s", err.Error()) + newState = StateInactiveUnkown } + h.setState(newState) } -func (h *HA) runHA(chEnv chan *Environment) { - select { - case env := <-chEnv: - if bytes.Compare(env.ID, h.super.EnvId) != 0 { - h.logger.Error("Received environment is not the one we expected. Panic.") - h.super.ChErr <- errors.New("received unexpected environment") - return - } +func (h *HA) runHA(chEnv chan *Environment, env *Environment) { + // Force regular Icinga DB heartbeat writes to the database even if we receive no heartbeats from + // Icinga 2. Icinga 2 will send an heartbeat every second, so use two seconds here to avoid + // situations like forcing the update right before we receive the next heartbeat from Icinga 2. + const updateTimerDuration = 2 * time.Second - h.heartbeatTimer.Reset(time.Second * 15) - previous := h.lastHeartbeat - h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + updateTimer := time.NewTimer(updateTimerDuration) - if h.lastHeartbeat-previous < 10*1000 && h.state == StateActive { - err := h.updateOwnInstance(env) - - if err != nil { - h.logger.Errorf("Failed to update instance: %v", err) - h.super.ChErr <- errors.New("failed to update instance") + for { + select { + case env = <-chEnv: + if bytes.Compare(env.ID, h.super.EnvId) != 0 { + h.logger.Error("Received environment is not the one we expected. Panic.") + h.super.ChErr <- errors.New("received unexpected environment") return } - } else { - _, they, beat, err := h.getInstance() - if err != nil { - h.logger.Errorf("Failed to fetch instance: %v", err) - h.super.ChErr <- errors.New("failed to fetch instance") - return - } - if they == h.uid { - h.logger.Debug("We are active.") - if h.state != StateActive { - h.logger.Info("Icinga 2 sent heartbeat. Starting sync") - h.setState(StateActive) - } - - if err := h.updateOwnInstance(env); err != nil { - h.logger.Errorf("Failed to update instance: %v", err) - h.super.ChErr <- errors.New("failed to update instance") - return - } - } else if h.lastHeartbeat-beat > 15*1000 { - h.logger.Info("Taking over.") - if err := h.takeOverInstance(env); err != nil { - h.logger.Errorf("Failed to update instance: %v", err) - h.super.ChErr <- errors.New("failed to update instance") - } - h.setState(StateActive) - } else { - h.logger.Debug("Other instance is active.") - } + h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + case <-updateTimer.C: // force update } - case <-h.heartbeatTimer.C: - h.logger.Info("Icinga 2 sent no heartbeat for 15 seconds. Pausing sync") - h.setState(StateAllInactive) + + updateTimer.Reset(updateTimerDuration) + + h.checkResponsibility(env) } } diff --git a/ha/ha_test.go b/ha/ha_test.go index 71faa3e5..30550d37 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -88,14 +88,16 @@ func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ( var mysqlTestObserver = connection.DbIoSeconds.WithLabelValues("mysql", "test") -func TestHA_InsertInstance(t *testing.T) { +func TestHA_UpsertInstance(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) - err := ha.insertInstance(&Environment{}) - require.NoError(t, err, "insertInstance should not return an error") + err := ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.upsertInstance(tx, &Environment{}, false) + }) + require.NoError(t, err, "transaction running upsertInstance should not return an error") rows, err := ha.super.Dbw.SqlFetchAll( - mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId, + mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId, "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", ha.super.EnvId, ) @@ -109,41 +111,139 @@ func TestHA_InsertInstance(t *testing.T) { assert.Equal(t, ha.uid, theirUUID, "UUID must match") } -func TestHA_checkResponsibility(t *testing.T) { +func TestHA_checkResponsibility_NoOtherInstance(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) + + now := utils.TimeToMillisecs(time.Now()) + ha.lastHeartbeat = now ha.checkResponsibility(&Environment{}) - assert.Equal(t, StateActive, ha.state, "HA should be responsible, if no other instance is active") + assert.Equal(t, StateActive, ha.state, "HA should be active if no other instance exists") +} - _, err := ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") - require.NoError(t, err, "This test needs a working MySQL connection!") +func TestHA_checkResponsibility_OtherInactiveInstance(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) + + now := utils.TimeToMillisecs(time.Now()) + + otherUuid, err := uuid.NewRandom() + assert.NoError(t, err, "UUID generation failed") _, err = ha.super.Dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, - "INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible, icinga2_version, icinga2_start_time) VALUES (?, ?, ?, 'y', '', 0)", - ha.uid[:], ha.super.EnvId, 0, + "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?)", + otherUuid[:], ha.super.EnvId, utils.Bool[false], now, "", 0, ) - require.NoError(t, err, "This test needs a working MySQL connection!") - ha.state = StateAllInactive + ha.lastHeartbeat = now ha.checkResponsibility(&Environment{}) - assert.Equal(t, StateActive, ha.state, "HA should be responsible, if another instance was inactive for a long time") + assert.Equal(t, StateActive, ha.state, "HA should be active if there is only an inactive instance") +} - _, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") - require.NoError(t, err, "This test needs a working MySQL connection!") +func TestHA_checkResponsibility_OtherTimedOutInstance(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) + + now := utils.TimeToMillisecs(time.Now()) + timedOut := now - heartbeatTimeoutMillisecs + + otherUuid, err := uuid.NewRandom() + assert.NoError(t, err, "UUID generation failed") _, err = ha.super.Dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, - "INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible, icinga2_version, icinga2_start_time) VALUES (?, ?, ?, 'y', '', 0)", - ha.uid[:], ha.super.EnvId, utils.TimeToMillisecs(time.Now()), + "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?)", + otherUuid[:], ha.super.EnvId, utils.Bool[true], timedOut, "", 0, ) + require.NoError(t, err, "This test needs a working MySQL connection!") - ha.state = StateAllInactive + ha.lastHeartbeat = now ha.checkResponsibility(&Environment{}) - assert.NotEqual(t, StateActive, ha.state, "HA should not be responsible, if another instance is active") + assert.Equal(t, StateActive, ha.state, "HA should be active if another instance is timed out") +} + +func TestHA_checkResponsibility_OtherActiveInstance(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) + + now := utils.TimeToMillisecs(time.Now()) + + otherUuid, err := uuid.NewRandom() + assert.NoError(t, err, "UUID generation failed") + + _, err = ha.super.Dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?)", + otherUuid[:], ha.super.EnvId, utils.Bool[true], now, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") + + ha.lastHeartbeat = now + ha.checkResponsibility(&Environment{}) + + assert.Equal(t, StateOtherActive, ha.state, "HA should not be active if another instance is active") +} + +func TestHA_checkResponsibility_Concurrent(t *testing.T) { + numAttempts := 10 + numConcurrentTakeovers := 2 + failed := false + + for attempt := 0; !failed && attempt < numAttempts; attempt++ { + wg := sync.WaitGroup{} + wg.Add(numConcurrentTakeovers) + + haInstances, chErr := createTestingMultipleHA(t, testbackends.RedisTestAddr, numConcurrentTakeovers) + for _, ha := range haInstances { + ha.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + } + + for _, ha := range haInstances { + ha := ha + go func() { + defer wg.Done() + ha.checkResponsibility(&Environment{}) + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + loop: + for { + select { + case err := <-chErr: + assert.NoError(t, err, "checkResponsibility() should return no error") + if err != nil { + failed = true + } + case <-done: + break loop + } + } + + numActive := 0 + for _, ha := range haInstances { + if ha.state == StateActive { + numActive++ + } + } + + assert.Equal(t, 1, numActive, "exactly 1 instance must be active after checkResponsibility() but %d are active", numActive) + if numActive != 1 { + failed = true + } + } } func TestHA_waitForEnvironment(t *testing.T) { @@ -205,7 +305,6 @@ func TestHA_setAndInsertEnvironment(t *testing.T) { func TestHA_runHA(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) - ha.heartbeatTimer = time.NewTimer(10 * time.Second) chEnv := make(chan *Environment) @@ -235,58 +334,7 @@ func TestHA_runHA(t *testing.T) { wg.Done() }() - ha.runHA(chEnv) + ha.runHA(chEnv, &Environment{}) wg.Wait() } - -func TestHA_ConcurrentCheckResponsibility(t *testing.T) { - numAttempts := 10 - numConcurrentTakeovers := 2 - failed := false - - for attempt := 0; !failed && attempt < numAttempts; attempt++ { - wg := sync.WaitGroup{} - wg.Add(numConcurrentTakeovers) - - haInstances, chErr := createTestingMultipleHA(t, testbackends.RedisTestAddr, numConcurrentTakeovers) - for _, ha := range haInstances { - ha := ha - go func() { - defer wg.Done() - ha.checkResponsibility(&Environment{}) - }() - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - loop: - for { - select { - case err := <-chErr: - assert.NoError(t, err, "checkResponsibility() should return no error") - if err != nil { - failed = true - } - case <-done: - break loop - } - } - - numActive := 0 - for _, ha := range haInstances { - if ha.state == StateActive { - numActive++ - } - } - - assert.Equal(t, 1, numActive, "exactly 1 instance must be active after checkResponsibility() but %d are active", numActive) - if numActive != 1 { - failed = true - } - } -} From f3d21d5a95a3c29ad6b9904cad46af0a26c30167 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Mon, 21 Dec 2020 10:34:06 +0100 Subject: [PATCH 05/12] Add test for HA RegisterStateChangeListener --- ha/ha_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ha/ha_test.go b/ha/ha_test.go index 30550d37..fc1e2d37 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -338,3 +338,26 @@ func TestHA_runHA(t *testing.T) { wg.Wait() } + +func TestHA_RegisterStateChangeListener(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) + + assertNonBlockingState := func(expected State, ch <-chan State, msgAndArgs ...interface{}) { + select { + case actual := <-ch: + assert.Equal(t, expected, actual, msgAndArgs...) + default: + assert.Fail(t, "reading from channel should not block", msgAndArgs) + } + } + + chHA := ha.RegisterStateChangeListener() + + ha.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + ha.checkResponsibility(&Environment{}) + assertNonBlockingState(StateActive, chHA, "HA should send StateActive to state change channel when becoming active") + + ha.lastHeartbeat = 0 + ha.checkResponsibility(&Environment{}) + assertNonBlockingState(StateAllInactive, chHA, "HA should send StateAllInactive to state change channel when becoming inactive") +} From 335ff91fb78ed62f5209f50ae50c3fa18bb9b5ba Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 21 Jan 2021 12:23:16 +0100 Subject: [PATCH 06/12] Remove previous rows from icingadb_instance table To prevent this table from growing indefinitely, this now removes timed out rows for the same environment_id and endpoint_id. --- ha/ha.go | 19 +++++++++++++++++ ha/ha_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/ha/ha.go b/ha/ha.go index 7aee070d..4a4e7a9f 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -55,6 +55,7 @@ var mysqlObservers = struct { insertIntoEnvironment prometheus.Observer selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId prometheus.Observer selectHeartbeatResponsibleFromIcingadbInstanceById prometheus.Observer + deleteIcingadbInstanceByEndpointId prometheus.Observer }{ connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"), connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"), @@ -62,6 +63,7 @@ var mysqlObservers = struct { connection.DbIoSeconds.WithLabelValues("mysql", "insert into environment"), connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat, responsible from icingadb_instance where environment_id = ourEnvID"), connection.DbIoSeconds.WithLabelValues("mysql", "select heartbeat, responsible from icingadb_instance by id"), + connection.DbIoSeconds.WithLabelValues("mysql", "delete from icingadb_instance by endpoint_id"), } func (h *HA) setState(state State) { @@ -211,9 +213,26 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error { return err } +// Remove rows from icingadb_instance that were created by previous startups of this instance. +// A row is considered to be created by this instance if it shares the same environment_id and +// endpoint_id. Rows with a recent heartbeat are never removed. +func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environment) error { + heartbeatTimeoutThreshold := utils.TimeToMillisecs(time.Now()) - heartbeatTimeoutMillisecs + _, err := h.super.Dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId, + "DELETE FROM icingadb_instance "+ + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", + h.uid[:], h.super.EnvId, env.Icinga2.EndpointId, heartbeatTimeoutThreshold) + return err +} + func (h *HA) checkResponsibility(env *Environment) { var newState State err := h.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err := h.removePreviousInstances(tx, env) + if err != nil { + return err + } + foundActive, activeId, err := h.getActiveInstance(tx) if err != nil { return err diff --git a/ha/ha_test.go b/ha/ha_test.go index fc1e2d37..4e82a190 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -4,6 +4,7 @@ package ha import ( "crypto/sha1" + "encoding/hex" "github.com/Icinga/icingadb/config/testbackends" "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/supervisor" @@ -361,3 +362,58 @@ func TestHA_RegisterStateChangeListener(t *testing.T) { ha.checkResponsibility(&Environment{}) assertNonBlockingState(StateAllInactive, chHA, "HA should send StateAllInactive to state change channel when becoming inactive") } + +func TestHA_removePreviousInstances(t *testing.T) { + env := &Environment{} + env.Name = "test" + env.ID = Sha1bytes([]byte(env.Name)) + env.Icinga2.EndpointId = make([]byte, 20) + + ha := createTestingHA(t, testbackends.RedisTestAddr) + err := ha.setAndInsertEnvironment(env) + require.NoError(t, err, "setAndInsertEnvironment should not return an error") + + err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.upsertInstance(tx, &Environment{}, false) + }) + require.NoError(t, err, "upsertInstance() should not return an error") + + now := utils.TimeToMillisecs(time.Now()) + activeUuid, err := uuid.NewRandom() + require.NoError(t, err, "UUID generation failed") + _, err = ha.super.Dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?, ?)", + activeUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], now, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") + + timedOut := now - heartbeatTimeoutMillisecs + timedOutUuid, err := uuid.NewRandom() + require.NoError(t, err, "UUID generation failed") + _, err = ha.super.Dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?, ?)", + timedOutUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], timedOut, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") + + err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.removePreviousInstances(tx, env) + }) + assert.NoError(t, err, "removePreviousInstances() should not return an error") + + rows, err := ha.super.Dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance") + var instanceIds []string + for _, row := range rows { + instanceIds = append(instanceIds, hex.EncodeToString(row[0].([]byte))) + } + + assert.Contains(t, instanceIds, hex.EncodeToString(ha.uid[:]), "removePreviousInstance() should not remove its own row") + assert.Contains(t, instanceIds, hex.EncodeToString(activeUuid[:]), "removePreviousInstance() should not remove rows that are not timed out yet") + assert.NotContains(t, instanceIds, hex.EncodeToString(timedOutUuid[:]), "removePreviousInstances() should remove timed out instance") +} From 3f6a6534c1237aab62b4357f4aea69435f7f965b Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 4 Feb 2021 09:16:18 +0100 Subject: [PATCH 07/12] Detect and warn about slow HA heartbeat SQL transactions --- ha/ha.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ha/ha.go b/ha/ha.go index 4a4e7a9f..7c91eade 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -226,6 +226,8 @@ func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environme } func (h *HA) checkResponsibility(env *Environment) { + start := time.Now() + var newState State err := h.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { err := h.removePreviousInstances(tx, env) @@ -280,6 +282,22 @@ func (h *HA) checkResponsibility(env *Environment) { h.logger.Errorf("HA heartbeat failed: %s", err.Error()) newState = StateInactiveUnkown } + + txDuration := time.Since(start) + icinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - h.lastHeartbeat + if newState == StateActive && txDuration > heartbeatValidMillisecs*time.Millisecond/2 { + // The SQL transaction is too slow if it takes more than half the heartbeat validity + // period as in this case, we cannot expect to renew the heartbeat in time. + h.logger.Warnf("SQL transaction took %s, too slow to keep our heartbeat alive. "+ + "Check the health of your database.", txDuration) + newState = StateAllInactive + } else if newState == StateActive && icinga2HeartbeatAge > heartbeatValidMillisecs { + // If this was a forced periodic update, the heartbeat might also have + // expired during the execution of the SQL transaction. + h.logger.Warnf("Icinga 2 heartbeat expired during SQL transaction") + newState = StateAllInactive + } + h.setState(newState) } From 2957f32c7705eca5c57a046da50a0ec99adc02bc Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 4 Feb 2021 09:16:47 +0100 Subject: [PATCH 08/12] HA: prefer new Icinga 2 heartbeat over forced update --- ha/ha.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/ha/ha.go b/ha/ha.go index 7c91eade..6437062c 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -310,15 +310,29 @@ func (h *HA) runHA(chEnv chan *Environment, env *Environment) { updateTimer := time.NewTimer(updateTimerDuration) for { + var newEnv *Environment + + // Selecting from multiple channels does not guarantee which case gets executed if multiple ones + // are ready. However, when both a new environment is available and the update timer expired, we + // want to prefer the new environment. Therefore first try to select only from chEnv and only if + // there is nothing in the channel, i.e. in the default case, select from both channels. select { - case env = <-chEnv: + case newEnv = <-chEnv: + default: + select { + case newEnv = <-chEnv: + case <-updateTimer.C: + } + } + + if newEnv != nil { + env = newEnv if bytes.Compare(env.ID, h.super.EnvId) != 0 { h.logger.Error("Received environment is not the one we expected. Panic.") h.super.ChErr <- errors.New("received unexpected environment") return } h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) - case <-updateTimer.C: // force update } updateTimer.Reset(updateTimerDuration) From fc1c9c81349be20b3f0f3cfd4a590a5801f98e71 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 9 Feb 2021 16:16:40 +0100 Subject: [PATCH 09/12] Improve log messages related to HA takeover/handover --- ha/ha.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/ha/ha.go b/ha/ha.go index 6437062c..28350195 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -76,16 +76,16 @@ func (h *HA) setState(state State) { // invalid arguments case StateInit: - log.Fatal("Must not set HA state to StateInit") + h.logger.Fatal("Must not set HA state to StateInit") default: - log.Fatalf("Trying to change to invalid HA state %d", state) + h.logger.Fatalf("Trying to change to invalid HA state %d", state) } if state != h.state { if h.state != StateInit { - log.Infof("Changing HA state to %s (was %s)", state.String(), h.state.String()) + h.logger.Infof("Changing HA state to %s (was %s)", state.String(), h.state.String()) } else { - log.Infof("Changing HA state to %s", state.String()) + h.logger.Infof("Changing HA state to %s", state.String()) } h.state = state @@ -251,6 +251,7 @@ func (h *HA) checkResponsibility(env *Environment) { } else { // We are active according to the DB but our heartbeat from Icinga 2 is no longer valid. // Give up active state so that another instance has a chance to take over. + h.logger.Info("Becoming inactive due to expired Icinga 2 heartbeat.") newState = StateAllInactive } } else { @@ -264,6 +265,11 @@ func (h *HA) checkResponsibility(env *Environment) { h.logger.Info("No active instance, trying to take over.") newState = StateActive } else { + if h.state == StateActive { + // We are active according to our last state, however when + // reading from the database our heartbeat expired already. + h.logger.Info("Becoming inactive due to expired Icinga 2 heartbeat.") + } newState = StateAllInactive } } From 29f82647c09f924d563981686c663cba576b15b3 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 10 Feb 2021 14:25:27 +0100 Subject: [PATCH 10/12] HA: use separate Redis connection A dedidcated Redis connections helps to receive heartbeats from Icinga 2 even if there is a high load on the normal Redis connection pool. --- ha/ha_test.go | 6 ------ main.go | 3 ++- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/ha/ha_test.go b/ha/ha_test.go index 4e82a190..d8ee1d0b 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -20,14 +20,11 @@ import ( ) func createTestingHA(t *testing.T, redisAddr string) *HA { - redisConn := connection.NewRDBWrapper(redisAddr, "", 64) - mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) require.NoError(t, err, "This test needs a working MySQL connection!") super := supervisor.Supervisor{ ChErr: make(chan error), - Rdbw: redisConn, Dbw: mysqlConn, } @@ -50,8 +47,6 @@ func createTestingHA(t *testing.T, redisAddr string) *HA { } func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ([]*HA, <-chan error) { - redisConn := connection.NewRDBWrapper(redisAddr, "", 64) - mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) require.NoError(t, err, "This test needs a working MySQL connection!") @@ -65,7 +60,6 @@ func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ( super := supervisor.Supervisor{ ChErr: chErr, - Rdbw: redisConn, Dbw: mysqlConn, } diff --git a/main.go b/main.go index a58f95aa..07027718 100644 --- a/main.go +++ b/main.go @@ -106,6 +106,7 @@ func main() { metricsInfo := config.GetMetricsInfo() redisConn := connection.NewRDBWrapper(redisInfo.Host+":"+redisInfo.Port, redisInfo.Password, redisInfo.PoolSize) + redisConnHa := connection.NewRDBWrapper(redisInfo.Host+":"+redisInfo.Port, redisInfo.Password, 1) var dbDSN string if filepath.IsAbs(mysqlInfo.Host) { @@ -135,7 +136,7 @@ func main() { } go haInstance.StartHA(chEnv) - go ha.IcingaHeartbeatListener(redisConn, chEnv, super.ChErr) + go ha.IcingaHeartbeatListener(redisConnHa, chEnv, super.ChErr) go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16) From 4374eec84981c33fe6d7e89a6548d989b431a892 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 10 Feb 2021 14:33:45 +0100 Subject: [PATCH 11/12] HA: use separate MySQL connection A dedicated MySQL connection for HA heartbeats to help get them done reliably even if the normal connection pool is under heavy use. --- ha/ha.go | 16 +++++++++------- ha/ha_test.go | 32 +++++++++++++++----------------- main.go | 6 +++++- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/ha/ha.go b/ha/ha.go index 28350195..8ec1c0cb 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -25,6 +25,7 @@ type HA struct { uid uuid.UUID super *supervisor.Supervisor logger *log.Entry + dbw *connection.DBWrapper } const ( @@ -35,10 +36,11 @@ const ( heartbeatTimeoutMillisecs = heartbeatValidMillisecs + 5*1000 ) -func NewHA(super *supervisor.Supervisor) (*HA, error) { +func NewHA(super *supervisor.Supervisor, dbw *connection.DBWrapper) (*HA, error) { var err error ho := HA{ super: super, + dbw: dbw, } if ho.uid, err = uuid.NewRandom(); err != nil { @@ -96,7 +98,7 @@ func (h *HA) setState(state State) { func (h *HA) upsertInstance(tx connection.DbTransaction, env *Environment, isActive bool) error { if isActive { // If we are active or become active, ensure that no other instance has the active flag set. - _, err := h.super.Dbw.SqlExecTx(tx, mysqlObservers.updateIcingadbInstanceByEnvironmentId, + _, err := h.dbw.SqlExecTx(tx, mysqlObservers.updateIcingadbInstanceByEnvironmentId, "UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND responsible = ?", utils.Bool[false], h.super.EnvId, utils.Bool[true]) if err != nil { @@ -104,7 +106,7 @@ func (h *HA) upsertInstance(tx connection.DbTransaction, env *Environment, isAct } } - _, err := h.super.Dbw.SqlExecTx( + _, err := h.dbw.SqlExecTx( tx, mysqlObservers.insertIntoIcingadbInstance, "REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+ @@ -129,7 +131,7 @@ func (h *HA) upsertInstance(tx connection.DbTransaction, env *Environment, isAct } func (h *HA) getActiveInstance(tx connection.DbTransaction) (bool, uuid.UUID, error) { - rows, err := h.super.Dbw.SqlFetchAllTx( + rows, err := h.dbw.SqlFetchAllTx( tx, mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId, "SELECT id, heartbeat FROM icingadb_instance"+ " WHERE environment_id = ? AND responsible = ? AND heartbeat > ?", @@ -204,7 +206,7 @@ func (h *HA) waitForEnvironment(chEnv chan *Environment) *Environment { func (h *HA) setAndInsertEnvironment(env *Environment) error { h.super.EnvId = env.ID - _, err := h.super.Dbw.SqlExec( + _, err := h.dbw.SqlExec( mysqlObservers.insertIntoEnvironment, "REPLACE INTO environment(id, name) VALUES (?, ?)", env.ID, env.Name, @@ -218,7 +220,7 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error { // endpoint_id. Rows with a recent heartbeat are never removed. func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environment) error { heartbeatTimeoutThreshold := utils.TimeToMillisecs(time.Now()) - heartbeatTimeoutMillisecs - _, err := h.super.Dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId, + _, err := h.dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId, "DELETE FROM icingadb_instance "+ "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", h.uid[:], h.super.EnvId, env.Icinga2.EndpointId, heartbeatTimeoutThreshold) @@ -229,7 +231,7 @@ func (h *HA) checkResponsibility(env *Environment) { start := time.Now() var newState State - err := h.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err := h.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { err := h.removePreviousInstances(tx, env) if err != nil { return err diff --git a/ha/ha_test.go b/ha/ha_test.go index d8ee1d0b..c0db909e 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -20,22 +20,21 @@ import ( ) func createTestingHA(t *testing.T, redisAddr string) *HA { - mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) + mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 1) require.NoError(t, err, "This test needs a working MySQL connection!") super := supervisor.Supervisor{ ChErr: make(chan error), - Dbw: mysqlConn, } - ha, _ := NewHA(&super) + ha, _ := NewHA(&super, mysqlConn) hash := sha1.New() hash.Write([]byte("derp")) ha.super.EnvId = hash.Sum(nil) ha.uid = uuid.MustParse("551bc748-94b2-4d27-b6a4-15c52aecfe85") - _, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") + _, err = ha.dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") require.NoError(t, err, "This test needs a working MySQL connection!") ha.logger = log.WithFields(log.Fields{ @@ -60,10 +59,9 @@ func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ( super := supervisor.Supervisor{ ChErr: chErr, - Dbw: mysqlConn, } - ha, _ := NewHA(&super) + ha, _ := NewHA(&super, mysqlConn) hash := sha1.New() hash.Write([]byte("derp")) @@ -86,12 +84,12 @@ var mysqlTestObserver = connection.DbIoSeconds.WithLabelValues("mysql", "test") func TestHA_UpsertInstance(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) - err := ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err := ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { return ha.upsertInstance(tx, &Environment{}, false) }) require.NoError(t, err, "transaction running upsertInstance should not return an error") - rows, err := ha.super.Dbw.SqlFetchAll( + rows, err := ha.dbw.SqlFetchAll( mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId, "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", ha.super.EnvId, @@ -124,7 +122,7 @@ func TestHA_checkResponsibility_OtherInactiveInstance(t *testing.T) { otherUuid, err := uuid.NewRandom() assert.NoError(t, err, "UUID generation failed") - _, err = ha.super.Dbw.SqlExec( + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time)"+ @@ -148,7 +146,7 @@ func TestHA_checkResponsibility_OtherTimedOutInstance(t *testing.T) { otherUuid, err := uuid.NewRandom() assert.NoError(t, err, "UUID generation failed") - _, err = ha.super.Dbw.SqlExec( + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time)"+ @@ -171,7 +169,7 @@ func TestHA_checkResponsibility_OtherActiveInstance(t *testing.T) { otherUuid, err := uuid.NewRandom() assert.NoError(t, err, "UUID generation failed") - _, err = ha.super.Dbw.SqlExec( + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time)"+ @@ -287,7 +285,7 @@ func TestHA_setAndInsertEnvironment(t *testing.T) { err := ha.setAndInsertEnvironment(&env) require.NoError(t, err, "setAndInsertEnvironment should not return an error") - rows, err := ha.super.Dbw.SqlFetchAll( + rows, err := ha.dbw.SqlFetchAll( mysqlTestObserver, "SELECT name from environment where id = ? LIMIT 1", ha.super.EnvId, @@ -367,7 +365,7 @@ func TestHA_removePreviousInstances(t *testing.T) { err := ha.setAndInsertEnvironment(env) require.NoError(t, err, "setAndInsertEnvironment should not return an error") - err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err = ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { return ha.upsertInstance(tx, &Environment{}, false) }) require.NoError(t, err, "upsertInstance() should not return an error") @@ -375,7 +373,7 @@ func TestHA_removePreviousInstances(t *testing.T) { now := utils.TimeToMillisecs(time.Now()) activeUuid, err := uuid.NewRandom() require.NoError(t, err, "UUID generation failed") - _, err = ha.super.Dbw.SqlExec( + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time)"+ @@ -387,7 +385,7 @@ func TestHA_removePreviousInstances(t *testing.T) { timedOut := now - heartbeatTimeoutMillisecs timedOutUuid, err := uuid.NewRandom() require.NoError(t, err, "UUID generation failed") - _, err = ha.super.Dbw.SqlExec( + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time)"+ @@ -396,12 +394,12 @@ func TestHA_removePreviousInstances(t *testing.T) { ) require.NoError(t, err, "This test needs a working MySQL connection!") - err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err = ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { return ha.removePreviousInstances(tx, env) }) assert.NoError(t, err, "removePreviousInstances() should not return an error") - rows, err := ha.super.Dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance") + rows, err := ha.dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance") var instanceIds []string for _, row := range rows { instanceIds = append(instanceIds, hex.EncodeToString(row[0].([]byte))) diff --git a/main.go b/main.go index 07027718..43b43b9e 100644 --- a/main.go +++ b/main.go @@ -119,6 +119,10 @@ func main() { if err != nil { log.Fatal(err) } + mysqlConnHa, err := connection.NewDBWrapper(dbDSN, 1) + if err != nil { + log.Fatal(err) + } super := supervisor.Supervisor{ ChErr: make(chan error), @@ -130,7 +134,7 @@ func main() { } chEnv := make(chan *ha.Environment) - haInstance, err := ha.NewHA(&super) + haInstance, err := ha.NewHA(&super, mysqlConnHa) if err != nil { log.Fatal(err) } From 9d23bbec2ed3de21dac1bd3bff51c1927ce24ce4 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 12 Feb 2021 09:01:55 +0100 Subject: [PATCH 12/12] HA: only read new hearbeats from Redis Selecting any heartbeat from the last 15 seconds from Redis makes little sense, just wait for new ones. If Icinga 2 just stopped, there is no point in becoming active for just a few seconds, otherwise the next heartbeat will arrive in a second anyways. --- ha/heartbeat.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ha/heartbeat.go b/ha/heartbeat.go index 56260c7e..db8f933f 100644 --- a/ha/heartbeat.go +++ b/ha/heartbeat.go @@ -5,12 +5,10 @@ package ha import ( "crypto/sha1" "encoding/json" - "fmt" "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/utils" "github.com/go-redis/redis/v7" log "github.com/sirupsen/logrus" - "time" ) type Environment struct { @@ -43,7 +41,7 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment log.Info("Starting heartbeat listener") xReadArgs := redis.XReadArgs{ - Streams: []string{"icinga:stats", fmt.Sprintf("%d-0", utils.TimeToMillisecs(time.Now().Add(-15*time.Second)))}, + Streams: []string{"icinga:stats", "$"}, Count: 1, Block: 0, }