diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index 59996fb7..7cbabdf5 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/Icinga/icingadb/configobject" "github.com/Icinga/icingadb/connection" - "github.com/Icinga/icingadb/ha" "github.com/Icinga/icingadb/jsondecoder" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" @@ -60,14 +59,14 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co for msg := range chHA { switch msg { // Icinga 2 probably restarted or died, stop operations and tell all workers to shut down. - case ha.Notify_StopSync: + case Notify_StopSync: if done != nil { log.Debugf("%s: Lost responsibility", objectInformation.ObjectType) close(done) done = nil } // Starts up the whole sync process. - case ha.Notify_StartSync: + case Notify_StartSync: if done != nil { continue } diff --git a/configobject/configsync/configsync_test.go b/configobject/configsync/configsync_test.go index 1276abaf..e375a6bc 100644 --- a/configobject/configsync/configsync_test.go +++ b/configobject/configsync/configsync_test.go @@ -7,7 +7,6 @@ import ( "github.com/Icinga/icingadb/configobject" "github.com/Icinga/icingadb/configobject/objecttypes/host" "github.com/Icinga/icingadb/connection" - "github.com/Icinga/icingadb/ha" "github.com/Icinga/icingadb/jsondecoder" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" @@ -66,7 +65,7 @@ func TestOperator_InsertHost(t *testing.T) { testbackends.RedisTestClient.HSet("icinga:checksum:host", "a9ef44eb69fda8fbc32bee33322b6518057f559f", "{\"checksum\":\"b6e87de3d4f31b3d4d35466171f4088693b46071\"}") for _, ch := range chs { - ch <- ha.Notify_StartSync + ch <- Notify_StartSync } assert.Eventually(t, func() bool { @@ -142,7 +141,7 @@ func TestOperator_DeleteHost(t *testing.T) { require.NoError(t, err) for _, ch := range chs { - ch <- ha.Notify_StartSync + ch <- Notify_StartSync } assert.Eventually(t, func() bool { @@ -214,7 +213,7 @@ func TestOperator_UpdateHost(t *testing.T) { require.NoError(t, err) for _, ch := range chs { - ch <- ha.Notify_StartSync + ch <- Notify_StartSync } assert.Eventually(t, func() bool { diff --git a/configobject/configsync/ha.go b/configobject/configsync/ha.go new file mode 100644 index 00000000..df8d5125 --- /dev/null +++ b/configobject/configsync/ha.go @@ -0,0 +1,117 @@ +package configsync + +import ( + "github.com/Icinga/icingadb/ha" + "github.com/Icinga/icingadb/supervisor" + "github.com/go-redis/redis/v7" + log "github.com/sirupsen/logrus" + "sync" + "time" +) + +const ( + Notify_StartSync = iota + Notify_StopSync +) + +type ConfigSyncHA struct { + super *supervisor.Supervisor + done chan struct{} + chHA <-chan ha.State + notificationListeners map[string][]chan int + notificationListenersMutex sync.Mutex + lastEventId string + haIsActive bool +} + +func NewConfigSyncHA(super *supervisor.Supervisor, chHA <-chan ha.State) *ConfigSyncHA { + return &ConfigSyncHA{ + super: super, + done: make(chan struct{}), + chHA: chHA, + notificationListeners: map[string][]chan int{}, + lastEventId: "0-0", + } +} + +func (h *ConfigSyncHA) Start() { + go h.run() +} + +func (h *ConfigSyncHA) run() { + every1s := time.NewTicker(time.Second) + +loop: + for { + select { + case <-h.done: + log.Info("received done signal, shutting down") + break loop + case <-every1s.C: + h.runEventListener() + } + } +} + +func (h *ConfigSyncHA) Stop() { + close(h.done) +} + +func (h *ConfigSyncHA) runEventListener() { + select { + case newState := <-h.chHA: + h.haIsActive = newState == ha.StateActive + if !h.haIsActive { + h.lastEventId = "0-0" + h.notifyNotificationListener("*", Notify_StopSync) + } + default: // don't block if there is no change + } + + if !h.haIsActive { + return + } + + result := h.super.Rdbw.XRead(&redis.XReadArgs{Block: -1, Streams: []string{"icinga:dump", h.lastEventId}}) + streams, err := result.Result() + if err != nil { + if err.Error() != "redis: nil" { + h.super.ChErr <- err + } + return + } + + events := streams[0].Messages + if len(events) == 0 { + return + } + + for _, event := range events { + h.lastEventId = event.ID + values := event.Values + + if values["state"] == "done" { + h.notifyNotificationListener(values["type"].(string), Notify_StartSync) + } else { + h.notifyNotificationListener(values["type"].(string), Notify_StopSync) + } + } +} + +func (h *ConfigSyncHA) RegisterNotificationListener(listenerType string) chan int { + ch := make(chan int, 10) + h.notificationListenersMutex.Lock() + h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch) + h.notificationListenersMutex.Unlock() + return ch +} + +func (h *ConfigSyncHA) notifyNotificationListener(listenerType string, msg int) { + for t, chs := range h.notificationListeners { + if t == listenerType || listenerType == "*" { + for _, c := range chs { + c <- msg + } + } + } +} diff --git a/configobject/configsync/ha_test.go b/configobject/configsync/ha_test.go new file mode 100644 index 00000000..2ace0927 --- /dev/null +++ b/configobject/configsync/ha_test.go @@ -0,0 +1,104 @@ +package configsync + +import ( + "github.com/Icinga/icingadb/config/testbackends" + "github.com/Icinga/icingadb/connection" + "github.com/Icinga/icingadb/ha" + "github.com/Icinga/icingadb/supervisor" + "github.com/go-redis/redis/v7" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func GetSuper() *supervisor.Supervisor { + redisConn := connection.NewRDBWrapper(testbackends.RedisTestAddr, "", 64) + + return &supervisor.Supervisor{ + ChErr: make(chan error), + Rdbw: redisConn, + } +} + +func TestHA_NotificationListeners(t *testing.T) { + super := GetSuper() + chHA := make(chan ha.State) + haInst := NewConfigSyncHA(super, chHA) + + chHost := haInst.RegisterNotificationListener("host") + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + assert.Equal(t, Notify_StartSync, <-chHost) + wg.Done() + }() + + haInst.notifyNotificationListener("host", Notify_StartSync) + wg.Wait() + + chService := haInst.RegisterNotificationListener("service") + wg.Add(1) + + go func() { + assert.Equal(t, Notify_StartSync, <-chService) + wg.Done() + }() + + haInst.notifyNotificationListener("service", Notify_StartSync) + wg.Wait() + + wg.Add(1) + + go func() { + assert.Equal(t, Notify_StartSync, <-chService) + assert.Equal(t, Notify_StartSync, <-chHost) + wg.Done() + }() + + haInst.notifyNotificationListener("*", Notify_StartSync) + wg.Wait() +} + +func TestHA_EventListener(t *testing.T) { + super := GetSuper() + chHA := make(chan ha.State) + haInst := NewConfigSyncHA(super, chHA) + + haInst.Start() + defer haInst.Stop() + + chHA <- ha.StateActive + + testbackends.RedisTestClient.Del("icinga:dump") + + chHost := haInst.RegisterNotificationListener("host") + chService := haInst.RegisterNotificationListener("service") + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + assert.Equal(t, Notify_StartSync, <-chHost) + assert.Equal(t, Notify_StopSync, <-chHost) + assert.Equal(t, Notify_StartSync, <-chHost) + assert.Equal(t, Notify_StopSync, <-chHost) + wg.Done() + }() + + go func() { + assert.Equal(t, Notify_StartSync, <-chService) + assert.Equal(t, Notify_StopSync, <-chService) + assert.Equal(t, Notify_StartSync, <-chService) + wg.Done() + }() + + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}}) + testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}}) + + wg.Wait() +} diff --git a/connection/mysql.go b/connection/mysql.go index c91a1973..80fe3a30 100644 --- a/connection/mysql.go +++ b/connection/mysql.go @@ -300,12 +300,6 @@ func (dbw *DBWrapper) SqlCommit(tx DbTransaction, quiet bool) error { }).Debug("COMMIT transaction") } - if err != nil { - if dbw.isConnectionError(err) { - continue - } - } - return err } } @@ -335,12 +329,6 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error { err = tx.Rollback() } - if err != nil { - if dbw.isConnectionError(err) { - continue - } - } - return err } } @@ -382,7 +370,7 @@ func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryObserver prometh } // sqlExecInternal is a wrapper around sql.Exec() for auto-logging. -func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, sql string, quiet bool, args ...interface{}) (sql.Result, error) { +func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, query string, quiet bool, args ...interface{}) (sql.Result, error) { for { if !dbw.IsConnected() { dbw.WaitForConnection() @@ -394,7 +382,7 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prome benchmarc = utils.NewBenchmark() } - res, err := db.Exec(sql, args...) + res, err := db.Exec(query, args...) DbOperationsExec.Inc() if !quiet { @@ -408,12 +396,12 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prome "benchmark": benchmarc, "affected_rows": prettyPrintedRowsAffected{res}, "args": prettyPrintedArgs{args}, - "query": prettyPrintedSql{sql}, + "query": prettyPrintedSql{query}, }).Debug("Finished Exec") } if err != nil { - if dbw.isConnectionError(err) { + if _, isTx := db.(DbTransaction); !isTx && dbw.isConnectionError(err) { continue } } @@ -433,10 +421,8 @@ func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryObserve res, err := sqlTryFetchAll(db, queryObserver, query, quiet, args...) if err != nil { - if _, isDb := db.(*sql.DB); isDb { - if dbw.isConnectionError(err) { - continue - } + if _, isTx := db.(DbTransaction); !isTx && dbw.isConnectionError(err) { + continue } } diff --git a/connection/mysql_test.go b/connection/mysql_test.go index 32bc7f7b..da119bf5 100644 --- a/connection/mysql_test.go +++ b/connection/mysql_test.go @@ -107,36 +107,6 @@ func TestDBWrapper_CheckConnection(t *testing.T) { assert.Equal(t, uint32(11), atomic.LoadUint32(dbw.ConnectionLostCounterAtomic)) } -func TestDBWrapper_SqlCommit(t *testing.T) { - mockDb := new(DbMock) - dbw := NewTestDBW(mockDb) - mockTx := new(TransactionMock) - - mockTx.On("Commit").Return(errors.New("whoops")).Once() - mockTx.On("Commit").Return(nil).Once() - mockDb.On("Ping").Return(errors.New("whoops")).Once() - - var err error - done := make(chan bool) - - dbw.CompareAndSetConnected(true) - go func() { - err = dbw.SqlCommit(mockTx, false) - done <- true - }() - - time.Sleep(time.Millisecond * 50) - - dbw.CompareAndSetConnected(true) - dbw.ConnectionUpCondition.Broadcast() - - <-done - - assert.NoError(t, err) - mockTx.AssertExpectations(t) - mockDb.AssertExpectations(t) -} - func TestDBWrapper_SqlBegin(t *testing.T) { mockDb := new(DbMock) dbw := NewTestDBW(mockDb) diff --git a/ha/ha.go b/ha/ha.go index 3a15025d..8ec1c0cb 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -10,7 +10,6 @@ import ( "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" - "github.com/go-redis/redis/v7" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -18,30 +17,30 @@ import ( "time" ) -const ( - Notify_StartSync = iota - Notify_StopSync -) - type HA struct { - isActive bool - lastHeartbeat int64 - uid uuid.UUID - super *supervisor.Supervisor - notificationListeners map[string][]chan int - notificationListenersMutex sync.Mutex - lastEventId string - logger *log.Entry - heartbeatTimer *time.Timer + state State + stateChangeListeners []chan State + stateChangeListenersMutex sync.Mutex + lastHeartbeat int64 + uid uuid.UUID + super *supervisor.Supervisor + logger *log.Entry + dbw *connection.DBWrapper } -func NewHA(super *supervisor.Supervisor) (*HA, error) { +const ( + // We consider heartbeats valid for 10 seconds + heartbeatValidMillisecs = 10 * 1000 + + // We consider the heartbeat of another instance to be expired 5 seconds after its validity ended + heartbeatTimeoutMillisecs = heartbeatValidMillisecs + 5*1000 +) + +func NewHA(super *supervisor.Supervisor, dbw *connection.DBWrapper) (*HA, error) { var err error ho := HA{ - super: super, - notificationListeners: make(map[string][]chan int), - notificationListenersMutex: sync.Mutex{}, - lastEventId: "0-0", + super: super, + dbw: dbw, } if ho.uid, err = uuid.NewRandom(); err != nil { @@ -52,116 +51,131 @@ func NewHA(super *supervisor.Supervisor) (*HA, error) { } var mysqlObservers = struct { - updateIcingadbInstanceById prometheus.Observer - updateIcingadbInstanceByEnvironmentId prometheus.Observer - insertIntoIcingadbInstance prometheus.Observer - insertIntoEnvironment prometheus.Observer - selectIdHeartbeatFromIcingadbInstanceByEnvironmentId prometheus.Observer + updateIcingadbInstanceById prometheus.Observer + updateIcingadbInstanceByEnvironmentId prometheus.Observer + insertIntoIcingadbInstance prometheus.Observer + insertIntoEnvironment prometheus.Observer + selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId prometheus.Observer + selectHeartbeatResponsibleFromIcingadbInstanceById prometheus.Observer + deleteIcingadbInstanceByEndpointId prometheus.Observer }{ connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"), connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"), connection.DbIoSeconds.WithLabelValues("mysql", "insert into icingadb_instance"), connection.DbIoSeconds.WithLabelValues("mysql", "insert into environment"), - connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat from icingadb_instance where environment_id = ourEnvID"), + connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat, responsible from icingadb_instance where environment_id = ourEnvID"), + connection.DbIoSeconds.WithLabelValues("mysql", "select heartbeat, responsible from icingadb_instance by id"), + connection.DbIoSeconds.WithLabelValues("mysql", "delete from icingadb_instance by endpoint_id"), } -func (h *HA) updateOwnInstance(env *Environment) error { - _, err := h.super.Dbw.SqlExec( - mysqlObservers.insertIntoIcingadbInstance, - "REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, heartbeat, responsible,"+ +func (h *HA) setState(state State) { + switch state { + // valid new states (no action needed) + case StateActive: + case StateOtherActive: + case StateAllInactive: + case StateInactiveUnkown: + + // invalid arguments + case StateInit: + h.logger.Fatal("Must not set HA state to StateInit") + default: + h.logger.Fatalf("Trying to change to invalid HA state %d", state) + } + + if state != h.state { + if h.state != StateInit { + h.logger.Infof("Changing HA state to %s (was %s)", state.String(), h.state.String()) + } else { + h.logger.Infof("Changing HA state to %s", state.String()) + } + + h.state = state + h.notifyStateChangeListeners(state) + } +} + +func (h *HA) upsertInstance(tx connection.DbTransaction, env *Environment, isActive bool) error { + if isActive { + // If we are active or become active, ensure that no other instance has the active flag set. + _, err := h.dbw.SqlExecTx(tx, mysqlObservers.updateIcingadbInstanceByEnvironmentId, + "UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND responsible = ?", + utils.Bool[false], h.super.EnvId, utils.Bool[true]) + if err != nil { + return err + } + } + + _, err := h.dbw.SqlExecTx( + tx, mysqlObservers.insertIntoIcingadbInstance, + "REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ " icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+ " icinga2_active_service_checks_enabled, icinga2_active_host_checks_enabled,"+ " icinga2_event_handlers_enabled, icinga2_flap_detection_enabled,"+ - " icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, 'y', ?, ?, ?, ?, ?, ?, ?, ?)", - h.uid[:], - h.super.EnvId, - env.Icinga2.EndpointId, - h.lastHeartbeat, - env.Icinga2.Version, - int64(env.Icinga2.ProgramStart*1000), - utils.Bool[env.Icinga2.NotificationsEnabled], - utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], - utils.Bool[env.Icinga2.ActiveHostChecksEnabled], - utils.Bool[env.Icinga2.EventHandlersEnabled], - utils.Bool[env.Icinga2.FlapDetectionEnabled], - utils.Bool[env.Icinga2.PerformanceDataEnabled], + " icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + h.uid[:], // id + h.super.EnvId, // environment_id + env.Icinga2.EndpointId, // endpoint_id + utils.Bool[isActive], // responsible + h.lastHeartbeat, // heartbeat + env.Icinga2.Version, // icinga2_version + int64(env.Icinga2.ProgramStart*1000), // icinga2_start_time + utils.Bool[env.Icinga2.NotificationsEnabled], // icinga2_notifications_enabled + utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], // icinga2_active_service_checks_enabled + utils.Bool[env.Icinga2.ActiveHostChecksEnabled], // icinga2_active_host_checks_enabled + utils.Bool[env.Icinga2.EventHandlersEnabled], // icinga2_event_handlers_enabled + utils.Bool[env.Icinga2.FlapDetectionEnabled], // icinga2_flap_detection_enabled + utils.Bool[env.Icinga2.PerformanceDataEnabled], // icinga2_performance_data_enabled ) return err } -func (h *HA) takeOverInstance(env *Environment) error { - _, err := h.super.Dbw.SqlExec( - mysqlObservers.updateIcingadbInstanceByEnvironmentId, - "UPDATE icingadb_instance SET id = ?, endpoint_id = ?, heartbeat = ?,"+ - " icinga2_version = ?, icinga2_start_time = ?, icinga2_notifications_enabled = ?,"+ - " icinga2_active_service_checks_enabled = ?, icinga2_active_host_checks_enabled = ?,"+ - " icinga2_event_handlers_enabled = ?, icinga2_flap_detection_enabled = ?,"+ - " icinga2_performance_data_enabled = ? WHERE environment_id = ?", - h.uid[:], - env.Icinga2.EndpointId, - h.lastHeartbeat, - env.Icinga2.Version, - int64(env.Icinga2.ProgramStart*1000), - utils.Bool[env.Icinga2.NotificationsEnabled], - utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], - utils.Bool[env.Icinga2.ActiveHostChecksEnabled], - utils.Bool[env.Icinga2.EventHandlersEnabled], - utils.Bool[env.Icinga2.FlapDetectionEnabled], - utils.Bool[env.Icinga2.PerformanceDataEnabled], - h.super.EnvId, +func (h *HA) getActiveInstance(tx connection.DbTransaction) (bool, uuid.UUID, error) { + rows, err := h.dbw.SqlFetchAllTx( + tx, mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId, + "SELECT id, heartbeat FROM icingadb_instance"+ + " WHERE environment_id = ? AND responsible = ? AND heartbeat > ?", + h.super.EnvId, utils.Bool[true], utils.TimeToMillisecs(time.Now())-heartbeatTimeoutMillisecs, ) - return err -} - -func (h *HA) insertInstance(env *Environment) error { - _, err := h.super.Dbw.SqlExec( - mysqlObservers.insertIntoIcingadbInstance, - "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, heartbeat, responsible,"+ - " icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+ - " icinga2_active_service_checks_enabled, icinga2_active_host_checks_enabled,"+ - " icinga2_event_handlers_enabled, icinga2_flap_detection_enabled,"+ - " icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, 'y', ?, ?, ?, ?, ?, ?, ?, ?)", - h.uid[:], - h.super.EnvId, - env.Icinga2.EndpointId, - h.lastHeartbeat, - env.Icinga2.Version, - int64(env.Icinga2.ProgramStart*1000), - utils.Bool[env.Icinga2.NotificationsEnabled], - utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], - utils.Bool[env.Icinga2.ActiveHostChecksEnabled], - utils.Bool[env.Icinga2.EventHandlersEnabled], - utils.Bool[env.Icinga2.FlapDetectionEnabled], - utils.Bool[env.Icinga2.PerformanceDataEnabled], - ) - return err -} - -func (h *HA) getInstance() (bool, uuid.UUID, int64, error) { - rows, err := h.super.Dbw.SqlFetchAll( - mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId, - "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", - h.super.EnvId, - ) - if err != nil { - return false, uuid.UUID{}, 0, err + return false, uuid.UUID{}, err } + if len(rows) > 1 { + return false, uuid.UUID{}, errors.New("there is more than one active IcingaDB instance") + } + if len(rows) == 0 { - return false, uuid.UUID{}, 0, nil + // No active instance according to database. + return false, uuid.UUID{}, nil } - var theirUUID uuid.UUID - copy(theirUUID[:], rows[0][0].([]byte)) + idBytes := rows[0][0].([]byte) + icinga2Heartbeat := rows[0][1].(int64) - return true, theirUUID, rows[0][1].(int64), nil + activeId, err := uuid.FromBytes(idBytes) + if err != nil { + return false, uuid.UUID{}, fmt.Errorf("invalid active UUID in database: %s", err.Error()) + } + + icinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - icinga2Heartbeat + + if activeId == h.uid && icinga2HeartbeatAge > heartbeatValidMillisecs { + // Our heartbeat is too old to be considered valid, no longer consider ourselves to be active. + return false, uuid.UUID{}, nil + } else if activeId != h.uid && icinga2HeartbeatAge > heartbeatTimeoutMillisecs { + // Their heartbeat is old enough to be considered timed out, no longer consider them to be active. + return false, uuid.UUID{}, nil + } + + return true, activeId, nil } func (h *HA) StartHA(chEnv chan *Environment) { env := h.waitForEnvironment(chEnv) + h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) err := h.setAndInsertEnvironment(env) if err != nil { - h.super.ChErr <- fmt.Errorf("Could not insert environment into MySQL: %s", err.Error()) + h.super.ChErr <- fmt.Errorf("could not insert environment into MySQL: %s", err.Error()) } h.logger = log.WithFields(log.Fields{ @@ -172,13 +186,7 @@ func (h *HA) StartHA(chEnv chan *Environment) { h.logger.Info("Got initial environment.") - h.checkResponsibility(env) - - h.heartbeatTimer = time.NewTimer(time.Second * 15) - - for { - h.runHA(chEnv) - } + h.runHA(chEnv, env) } func (h *HA) waitForEnvironment(chEnv chan *Environment) *Environment { @@ -198,7 +206,7 @@ func (h *HA) waitForEnvironment(chEnv chan *Environment) *Environment { func (h *HA) setAndInsertEnvironment(env *Environment) error { h.super.EnvId = env.ID - _, err := h.super.Dbw.SqlExec( + _, err := h.dbw.SqlExec( mysqlObservers.insertIntoEnvironment, "REPLACE INTO environment(id, name) VALUES (?, ?)", env.ID, env.Name, @@ -207,151 +215,163 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error { return err } +// Remove rows from icingadb_instance that were created by previous startups of this instance. +// A row is considered to be created by this instance if it shares the same environment_id and +// endpoint_id. Rows with a recent heartbeat are never removed. +func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environment) error { + heartbeatTimeoutThreshold := utils.TimeToMillisecs(time.Now()) - heartbeatTimeoutMillisecs + _, err := h.dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId, + "DELETE FROM icingadb_instance "+ + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", + h.uid[:], h.super.EnvId, env.Icinga2.EndpointId, heartbeatTimeoutThreshold) + return err +} + func (h *HA) checkResponsibility(env *Environment) { - found, _, beat, err := h.getInstance() - if err != nil { - h.logger.Errorf("Failed to fetch instance: %v", err) - h.super.ChErr <- errors.New("failed to fetch instance") - return - } - - if utils.TimeToMillisecs(time.Now())-beat > 15*1000 { - h.logger.Info("Taking over.") - - // This means there was no instance row match, insert - if !found { - err = h.insertInstance(env) - } else { - err = h.takeOverInstance(env) - } + start := time.Now() + var newState State + err := h.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err := h.removePreviousInstances(tx, env) if err != nil { - h.logger.Errorf("Failed to insert/update instance: %v", err) - h.super.ChErr <- errors.New("failed to insert/update instance") - return + return err } - h.isActive = true - } else { - h.logger.Info("Other instance is active.") - h.isActive = false - h.lastEventId = "0-0" - } -} - -func (h *HA) runHA(chEnv chan *Environment) { - select { - case env := <-chEnv: - if bytes.Compare(env.ID, h.super.EnvId) != 0 { - h.logger.Error("Received environment is not the one we expected. Panic.") - h.super.ChErr <- errors.New("received unexpected environment") - return + foundActive, activeId, err := h.getActiveInstance(tx) + if err != nil { + return err } - h.heartbeatTimer.Reset(time.Second * 15) - previous := h.lastHeartbeat - h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + lastIcinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - h.lastHeartbeat + lastIcinga2HeartbeatValid := lastIcinga2HeartbeatAge < heartbeatValidMillisecs - if h.lastHeartbeat-previous < 10*1000 && h.isActive { - err := h.updateOwnInstance(env) - - if err != nil { - h.logger.Errorf("Failed to update instance: %v", err) - h.super.ChErr <- errors.New("failed to update instance") - return + if foundActive { + if activeId == h.uid { + if lastIcinga2HeartbeatValid { + // We are active according to the DB and have a valid heartbeat, keep it that way. + newState = StateActive + } else { + // We are active according to the DB but our heartbeat from Icinga 2 is no longer valid. + // Give up active state so that another instance has a chance to take over. + h.logger.Info("Becoming inactive due to expired Icinga 2 heartbeat.") + newState = StateAllInactive + } + } else { + // Some other instance is active, remain passive + newState = StateOtherActive } } else { - _, they, beat, err := h.getInstance() - if err != nil { - h.logger.Errorf("Failed to fetch instance: %v", err) - h.super.ChErr <- errors.New("failed to fetch instance") - return - } - if they == h.uid { - h.logger.Debug("We are active.") - if !h.isActive { - h.logger.Info("Icinga 2 sent heartbeat. Starting sync") - h.isActive = true - } - - if err := h.updateOwnInstance(env); err != nil { - h.logger.Errorf("Failed to update instance: %v", err) - h.super.ChErr <- errors.New("failed to update instance") - return - } - } else if h.lastHeartbeat-beat > 15*1000 { - h.logger.Info("Taking over.") - if err := h.takeOverInstance(env); err != nil { - h.logger.Errorf("Failed to update instance: %v", err) - h.super.ChErr <- errors.New("failed to update instance") - } - h.isActive = true + // No instance is currently active. Try take over, but only + // if we are actively receiving heartbeats from Icinga 2. + if lastIcinga2HeartbeatValid { + h.logger.Info("No active instance, trying to take over.") + newState = StateActive } else { - h.logger.Debug("Other instance is active.") + if h.state == StateActive { + // We are active according to our last state, however when + // reading from the database our heartbeat expired already. + h.logger.Info("Becoming inactive due to expired Icinga 2 heartbeat.") + } + newState = StateAllInactive } } - case <-h.heartbeatTimer.C: - h.logger.Info("Icinga 2 sent no heartbeat for 15 seconds. Pausing sync") - h.isActive = false - h.lastEventId = "0-0" - h.notifyNotificationListener("*", Notify_StopSync) + + err = h.upsertInstance(tx, env, newState == StateActive) + if err != nil { + return err + } + + return nil + }) + if err != nil { + // Transaction failed, we are not sure about the current global state. + // In any case, we ensure that we are no longer active. + h.super.ChErr <- errors.New("HA heartbeat failed") + h.logger.Errorf("HA heartbeat failed: %s", err.Error()) + newState = StateInactiveUnkown } + + txDuration := time.Since(start) + icinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - h.lastHeartbeat + if newState == StateActive && txDuration > heartbeatValidMillisecs*time.Millisecond/2 { + // The SQL transaction is too slow if it takes more than half the heartbeat validity + // period as in this case, we cannot expect to renew the heartbeat in time. + h.logger.Warnf("SQL transaction took %s, too slow to keep our heartbeat alive. "+ + "Check the health of your database.", txDuration) + newState = StateAllInactive + } else if newState == StateActive && icinga2HeartbeatAge > heartbeatValidMillisecs { + // If this was a forced periodic update, the heartbeat might also have + // expired during the execution of the SQL transaction. + h.logger.Warnf("Icinga 2 heartbeat expired during SQL transaction") + newState = StateAllInactive + } + + h.setState(newState) } -func (h *HA) StartEventListener() { - every1s := time.NewTicker(time.Second) +func (h *HA) runHA(chEnv chan *Environment, env *Environment) { + // Force regular Icinga DB heartbeat writes to the database even if we receive no heartbeats from + // Icinga 2. Icinga 2 will send an heartbeat every second, so use two seconds here to avoid + // situations like forcing the update right before we receive the next heartbeat from Icinga 2. + const updateTimerDuration = 2 * time.Second + + updateTimer := time.NewTimer(updateTimerDuration) for { - <-every1s.C - h.runEventListener() + var newEnv *Environment + + // Selecting from multiple channels does not guarantee which case gets executed if multiple ones + // are ready. However, when both a new environment is available and the update timer expired, we + // want to prefer the new environment. Therefore first try to select only from chEnv and only if + // there is nothing in the channel, i.e. in the default case, select from both channels. + select { + case newEnv = <-chEnv: + default: + select { + case newEnv = <-chEnv: + case <-updateTimer.C: + } + } + + if newEnv != nil { + env = newEnv + if bytes.Compare(env.ID, h.super.EnvId) != 0 { + h.logger.Error("Received environment is not the one we expected. Panic.") + h.super.ChErr <- errors.New("received unexpected environment") + return + } + h.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + } + + updateTimer.Reset(updateTimerDuration) + + h.checkResponsibility(env) } } -func (h *HA) runEventListener() { - if !h.isActive { - return - } +func (h *HA) RegisterStateChangeListener() <-chan State { + // The channel has a buffer of size so that it can hold the most recent state. If it is full when we try to write, + // we chan just drain it as the element it contains is outdated anyways. + ch := make(chan State, 1) - result := h.super.Rdbw.XRead(&redis.XReadArgs{Block: -1, Streams: []string{"icinga:dump", h.lastEventId}}) - streams, err := result.Result() - if err != nil { - if err.Error() != "redis: nil" { - h.super.ChErr <- err - } - return - } + h.stateChangeListenersMutex.Lock() + defer h.stateChangeListenersMutex.Unlock() - events := streams[0].Messages - if len(events) == 0 { - return - } - - for _, event := range events { - h.lastEventId = event.ID - values := event.Values - - if values["state"] == "done" { - h.notifyNotificationListener(values["type"].(string), Notify_StartSync) - } else { - h.notifyNotificationListener(values["type"].(string), Notify_StopSync) - } - } -} - -func (h *HA) RegisterNotificationListener(listenerType string) chan int { - ch := make(chan int, 10) - h.notificationListenersMutex.Lock() - h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch) - h.notificationListenersMutex.Unlock() + h.stateChangeListeners = append(h.stateChangeListeners, ch) return ch } -func (h *HA) notifyNotificationListener(listenerType string, msg int) { - for t, chs := range h.notificationListeners { - if t == listenerType || listenerType == "*" { - for _, c := range chs { - c <- msg - } +func (h *HA) notifyStateChangeListeners(state State) { + h.stateChangeListenersMutex.Lock() + defer h.stateChangeListenersMutex.Unlock() + + for _, ch := range h.stateChangeListeners { + // drain the channel + select { + case <-ch: + default: } + + ch <- state } } diff --git a/ha/ha_test.go b/ha/ha_test.go index 6961322c..c0db909e 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -4,42 +4,37 @@ package ha import ( "crypto/sha1" + "encoding/hex" "github.com/Icinga/icingadb/config/testbackends" "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/supervisor" "github.com/Icinga/icingadb/utils" - "github.com/go-redis/redis/v7" "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "strconv" "sync" "testing" "time" ) func createTestingHA(t *testing.T, redisAddr string) *HA { - redisConn := connection.NewRDBWrapper(redisAddr, "", 64) - - mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) - if err != nil { - assert.Fail(t, "This test needs a working MySQL connection!") - } + mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 1) + require.NoError(t, err, "This test needs a working MySQL connection!") super := supervisor.Supervisor{ ChErr: make(chan error), - Rdbw: redisConn, - Dbw: mysqlConn, } - ha, _ := NewHA(&super) + ha, _ := NewHA(&super, mysqlConn) hash := sha1.New() hash.Write([]byte("derp")) ha.super.EnvId = hash.Sum(nil) ha.uid = uuid.MustParse("551bc748-94b2-4d27-b6a4-15c52aecfe85") - _, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") + _, err = ha.dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") require.NoError(t, err, "This test needs a working MySQL connection!") ha.logger = log.WithFields(log.Fields{ @@ -50,16 +45,52 @@ func createTestingHA(t *testing.T, redisAddr string) *HA { return ha } +func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ([]*HA, <-chan error) { + mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50) + require.NoError(t, err, "This test needs a working MySQL connection!") + + _, err = mysqlConn.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") + require.NoError(t, err, "This test needs a working MySQL connection!") + + instances := make([]*HA, numInstances) + chErr := make(chan error) + + for i := 0; i < numInstances; i++ { + + super := supervisor.Supervisor{ + ChErr: chErr, + } + + ha, _ := NewHA(&super, mysqlConn) + + hash := sha1.New() + hash.Write([]byte("derp")) + ha.super.EnvId = hash.Sum(nil) + ha.uid = uuid.NewSHA1(uuid.MustParse("551bc748-94b2-4d27-b6a4-15c52aecfe85"), []byte(strconv.Itoa(i))) + + ha.logger = log.WithFields(log.Fields{ + "context": "HA-Testing", + "UUID": ha.uid, + }) + + instances[i] = ha + } + + return instances, chErr +} + var mysqlTestObserver = connection.DbIoSeconds.WithLabelValues("mysql", "test") -func TestHA_InsertInstance(t *testing.T) { +func TestHA_UpsertInstance(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) - err := ha.insertInstance(&Environment{}) - require.NoError(t, err, "insertInstance should not return an error") + err := ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.upsertInstance(tx, &Environment{}, false) + }) + require.NoError(t, err, "transaction running upsertInstance should not return an error") - rows, err := ha.super.Dbw.SqlFetchAll( - mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId, + rows, err := ha.dbw.SqlFetchAll( + mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId, "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", ha.super.EnvId, ) @@ -73,41 +104,139 @@ func TestHA_InsertInstance(t *testing.T) { assert.Equal(t, ha.uid, theirUUID, "UUID must match") } -func TestHA_checkResponsibility(t *testing.T) { +func TestHA_checkResponsibility_NoOtherInstance(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) + + now := utils.TimeToMillisecs(time.Now()) + ha.lastHeartbeat = now ha.checkResponsibility(&Environment{}) - assert.Equal(t, true, ha.isActive, "HA should be responsible, if no other instance is active") + assert.Equal(t, StateActive, ha.state, "HA should be active if no other instance exists") +} - _, err := ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") - require.NoError(t, err, "This test needs a working MySQL connection!") +func TestHA_checkResponsibility_OtherInactiveInstance(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) - _, err = ha.super.Dbw.SqlExec( + now := utils.TimeToMillisecs(time.Now()) + + otherUuid, err := uuid.NewRandom() + assert.NoError(t, err, "UUID generation failed") + + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, - "INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible, icinga2_version, icinga2_start_time) VALUES (?, ?, ?, 'y', '', 0)", - ha.uid[:], ha.super.EnvId, 0, + "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?)", + otherUuid[:], ha.super.EnvId, utils.Bool[false], now, "", 0, ) - require.NoError(t, err, "This test needs a working MySQL connection!") - ha.isActive = false + ha.lastHeartbeat = now ha.checkResponsibility(&Environment{}) - assert.Equal(t, true, ha.isActive, "HA should be responsible, if another instance was inactive for a long time") + assert.Equal(t, StateActive, ha.state, "HA should be active if there is only an inactive instance") +} - _, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance") - require.NoError(t, err, "This test needs a working MySQL connection!") +func TestHA_checkResponsibility_OtherTimedOutInstance(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) - _, err = ha.super.Dbw.SqlExec( + now := utils.TimeToMillisecs(time.Now()) + timedOut := now - heartbeatTimeoutMillisecs + + otherUuid, err := uuid.NewRandom() + assert.NoError(t, err, "UUID generation failed") + + _, err = ha.dbw.SqlExec( mysqlObservers.insertIntoIcingadbInstance, - "INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible, icinga2_version, icinga2_start_time) VALUES (?, ?, ?, 'y', '', 0)", - ha.uid[:], ha.super.EnvId, utils.TimeToMillisecs(time.Now()), + "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?)", + otherUuid[:], ha.super.EnvId, utils.Bool[true], timedOut, "", 0, ) + require.NoError(t, err, "This test needs a working MySQL connection!") - ha.isActive = false + ha.lastHeartbeat = now ha.checkResponsibility(&Environment{}) - assert.Equal(t, false, ha.isActive, "HA should not be responsible, if another instance is active") + assert.Equal(t, StateActive, ha.state, "HA should be active if another instance is timed out") +} + +func TestHA_checkResponsibility_OtherActiveInstance(t *testing.T) { + ha := createTestingHA(t, testbackends.RedisTestAddr) + + now := utils.TimeToMillisecs(time.Now()) + + otherUuid, err := uuid.NewRandom() + assert.NoError(t, err, "UUID generation failed") + + _, err = ha.dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?)", + otherUuid[:], ha.super.EnvId, utils.Bool[true], now, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") + + ha.lastHeartbeat = now + ha.checkResponsibility(&Environment{}) + + assert.Equal(t, StateOtherActive, ha.state, "HA should not be active if another instance is active") +} + +func TestHA_checkResponsibility_Concurrent(t *testing.T) { + numAttempts := 10 + numConcurrentTakeovers := 2 + failed := false + + for attempt := 0; !failed && attempt < numAttempts; attempt++ { + wg := sync.WaitGroup{} + wg.Add(numConcurrentTakeovers) + + haInstances, chErr := createTestingMultipleHA(t, testbackends.RedisTestAddr, numConcurrentTakeovers) + for _, ha := range haInstances { + ha.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + } + + for _, ha := range haInstances { + ha := ha + go func() { + defer wg.Done() + ha.checkResponsibility(&Environment{}) + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + loop: + for { + select { + case err := <-chErr: + assert.NoError(t, err, "checkResponsibility() should return no error") + if err != nil { + failed = true + } + case <-done: + break loop + } + } + + numActive := 0 + for _, ha := range haInstances { + if ha.state == StateActive { + numActive++ + } + } + + assert.Equal(t, 1, numActive, "exactly 1 instance must be active after checkResponsibility() but %d are active", numActive) + if numActive != 1 { + failed = true + } + } } func TestHA_waitForEnvironment(t *testing.T) { @@ -156,7 +285,7 @@ func TestHA_setAndInsertEnvironment(t *testing.T) { err := ha.setAndInsertEnvironment(&env) require.NoError(t, err, "setAndInsertEnvironment should not return an error") - rows, err := ha.super.Dbw.SqlFetchAll( + rows, err := ha.dbw.SqlFetchAll( mysqlTestObserver, "SELECT name from environment where id = ? LIMIT 1", ha.super.EnvId, @@ -169,7 +298,6 @@ func TestHA_setAndInsertEnvironment(t *testing.T) { func TestHA_runHA(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) - ha.heartbeatTimer = time.NewTimer(10 * time.Second) chEnv := make(chan *Environment) @@ -199,82 +327,85 @@ func TestHA_runHA(t *testing.T) { wg.Done() }() - ha.runHA(chEnv) + ha.runHA(chEnv, &Environment{}) wg.Wait() } -func TestHA_NotificationListeners(t *testing.T) { +func TestHA_RegisterStateChangeListener(t *testing.T) { ha := createTestingHA(t, testbackends.RedisTestAddr) - chHost := ha.RegisterNotificationListener("host") - wg := sync.WaitGroup{} - wg.Add(1) + assertNonBlockingState := func(expected State, ch <-chan State, msgAndArgs ...interface{}) { + select { + case actual := <-ch: + assert.Equal(t, expected, actual, msgAndArgs...) + default: + assert.Fail(t, "reading from channel should not block", msgAndArgs) + } + } - go func() { - assert.Equal(t, Notify_StartSync, <-chHost) - wg.Done() - }() + chHA := ha.RegisterStateChangeListener() - ha.notifyNotificationListener("host", Notify_StartSync) - wg.Wait() + ha.lastHeartbeat = utils.TimeToMillisecs(time.Now()) + ha.checkResponsibility(&Environment{}) + assertNonBlockingState(StateActive, chHA, "HA should send StateActive to state change channel when becoming active") - chService := ha.RegisterNotificationListener("service") - wg.Add(1) - - go func() { - assert.Equal(t, Notify_StartSync, <-chService) - wg.Done() - }() - - ha.notifyNotificationListener("service", Notify_StartSync) - wg.Wait() - - wg.Add(1) - - go func() { - assert.Equal(t, Notify_StartSync, <-chService) - assert.Equal(t, Notify_StartSync, <-chHost) - wg.Done() - }() - - ha.notifyNotificationListener("*", Notify_StartSync) - wg.Wait() + ha.lastHeartbeat = 0 + ha.checkResponsibility(&Environment{}) + assertNonBlockingState(StateAllInactive, chHA, "HA should send StateAllInactive to state change channel when becoming inactive") } -func TestHA_EventListener(t *testing.T) { +func TestHA_removePreviousInstances(t *testing.T) { + env := &Environment{} + env.Name = "test" + env.ID = Sha1bytes([]byte(env.Name)) + env.Icinga2.EndpointId = make([]byte, 20) + ha := createTestingHA(t, testbackends.RedisTestAddr) - ha.isActive = true - go ha.StartEventListener() + err := ha.setAndInsertEnvironment(env) + require.NoError(t, err, "setAndInsertEnvironment should not return an error") - testbackends.RedisTestClient.Del("icinga:dump") + err = ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.upsertInstance(tx, &Environment{}, false) + }) + require.NoError(t, err, "upsertInstance() should not return an error") - chHost := ha.RegisterNotificationListener("host") - chService := ha.RegisterNotificationListener("service") + now := utils.TimeToMillisecs(time.Now()) + activeUuid, err := uuid.NewRandom() + require.NoError(t, err, "UUID generation failed") + _, err = ha.dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?, ?)", + activeUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], now, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") - wg := sync.WaitGroup{} - wg.Add(2) + timedOut := now - heartbeatTimeoutMillisecs + timedOutUuid, err := uuid.NewRandom() + require.NoError(t, err, "UUID generation failed") + _, err = ha.dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?, ?)", + timedOutUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], timedOut, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") - go func() { - assert.Equal(t, Notify_StartSync, <-chHost) - assert.Equal(t, Notify_StopSync, <-chHost) - assert.Equal(t, Notify_StartSync, <-chHost) - assert.Equal(t, Notify_StopSync, <-chHost) - wg.Done() - }() + err = ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.removePreviousInstances(tx, env) + }) + assert.NoError(t, err, "removePreviousInstances() should not return an error") - go func() { - assert.Equal(t, Notify_StartSync, <-chService) - assert.Equal(t, Notify_StopSync, <-chService) - assert.Equal(t, Notify_StartSync, <-chService) - wg.Done() - }() + rows, err := ha.dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance") + var instanceIds []string + for _, row := range rows { + instanceIds = append(instanceIds, hex.EncodeToString(row[0].([]byte))) + } - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}}) - testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}}) - - wg.Wait() + assert.Contains(t, instanceIds, hex.EncodeToString(ha.uid[:]), "removePreviousInstance() should not remove its own row") + assert.Contains(t, instanceIds, hex.EncodeToString(activeUuid[:]), "removePreviousInstance() should not remove rows that are not timed out yet") + assert.NotContains(t, instanceIds, hex.EncodeToString(timedOutUuid[:]), "removePreviousInstances() should remove timed out instance") } diff --git a/ha/heartbeat.go b/ha/heartbeat.go index 56260c7e..db8f933f 100644 --- a/ha/heartbeat.go +++ b/ha/heartbeat.go @@ -5,12 +5,10 @@ package ha import ( "crypto/sha1" "encoding/json" - "fmt" "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/utils" "github.com/go-redis/redis/v7" log "github.com/sirupsen/logrus" - "time" ) type Environment struct { @@ -43,7 +41,7 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment log.Info("Starting heartbeat listener") xReadArgs := redis.XReadArgs{ - Streams: []string{"icinga:stats", fmt.Sprintf("%d-0", utils.TimeToMillisecs(time.Now().Add(-15*time.Second)))}, + Streams: []string{"icinga:stats", "$"}, Count: 1, Block: 0, } diff --git a/ha/state.go b/ha/state.go new file mode 100644 index 00000000..3c363627 --- /dev/null +++ b/ha/state.go @@ -0,0 +1,28 @@ +package ha + +type State uint8 + +const ( + StateInit State = iota // Initial state when starting + StateActive // This instance is active + StateOtherActive // This instance is inactive but there is another one that is active + StateAllInactive // All known instances are inactive (i.e. none receives Icinga 2 heartbeats) + StateInactiveUnkown // This instance is inactive but does not known about the state of other instances +) + +func (s State) String() string { + switch s { + case StateInit: + return "init" + case StateActive: + return "active" + case StateOtherActive: + return "inactive (other instance active)" + case StateAllInactive: + return "inactive (all instances inactive)" + case StateInactiveUnkown: + return "inactive (other instances unkown)" + default: + return "(invalid)" + } +} diff --git a/main.go b/main.go index d3242085..43b43b9e 100644 --- a/main.go +++ b/main.go @@ -106,6 +106,7 @@ func main() { metricsInfo := config.GetMetricsInfo() redisConn := connection.NewRDBWrapper(redisInfo.Host+":"+redisInfo.Port, redisInfo.Password, redisInfo.PoolSize) + redisConnHa := connection.NewRDBWrapper(redisInfo.Host+":"+redisInfo.Port, redisInfo.Password, 1) var dbDSN string if filepath.IsAbs(mysqlInfo.Host) { @@ -118,6 +119,10 @@ func main() { if err != nil { log.Fatal(err) } + mysqlConnHa, err := connection.NewDBWrapper(dbDSN, 1) + if err != nil { + log.Fatal(err) + } super := supervisor.Supervisor{ ChErr: make(chan error), @@ -129,13 +134,13 @@ func main() { } chEnv := make(chan *ha.Environment) - haInstance, err := ha.NewHA(&super) + haInstance, err := ha.NewHA(&super, mysqlConnHa) if err != nil { log.Fatal(err) } go haInstance.StartHA(chEnv) - go ha.IcingaHeartbeatListener(redisConn, chEnv, super.ChErr) + go ha.IcingaHeartbeatListener(redisConnHa, chEnv, super.ChErr) go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16) @@ -145,8 +150,6 @@ func main() { history.StartHistoryWorkers(&super) - go haInstance.StartEventListener() - if metricsInfo.Host != "" { go prometheus.HandleHttp("["+metricsInfo.Host+"]:"+metricsInfo.Port, super.ChErr) } @@ -228,9 +231,12 @@ func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) { &hoststate.ObjectInformation, } + configSyncHA := configsync.NewConfigSyncHA(super, haInstance.RegisterStateChangeListener()) + configSyncHA.Start() + for _, objectInformation := range objectTypes { go func(information *configobject.ObjectInformation) { - super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(information.NotificationListenerType), information) + super.ChErr <- configsync.Operator(super, configSyncHA.RegisterNotificationListener(information.NotificationListenerType), information) }(objectInformation) } }