Merge pull request #232 from Icinga/bugfix/multiple-ha-takeovers

HA: Prevent multiple instances from taking over at the same time
This commit is contained in:
Noah Hilverling 2021-02-17 12:17:18 +01:00 committed by GitHub
commit 3df4b95de7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 748 additions and 390 deletions

View file

@ -7,7 +7,6 @@ import (
"fmt"
"github.com/Icinga/icingadb/configobject"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/ha"
"github.com/Icinga/icingadb/jsondecoder"
"github.com/Icinga/icingadb/supervisor"
"github.com/Icinga/icingadb/utils"
@ -60,14 +59,14 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
for msg := range chHA {
switch msg {
// Icinga 2 probably restarted or died, stop operations and tell all workers to shut down.
case ha.Notify_StopSync:
case Notify_StopSync:
if done != nil {
log.Debugf("%s: Lost responsibility", objectInformation.ObjectType)
close(done)
done = nil
}
// Starts up the whole sync process.
case ha.Notify_StartSync:
case Notify_StartSync:
if done != nil {
continue
}

View file

@ -7,7 +7,6 @@ import (
"github.com/Icinga/icingadb/configobject"
"github.com/Icinga/icingadb/configobject/objecttypes/host"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/ha"
"github.com/Icinga/icingadb/jsondecoder"
"github.com/Icinga/icingadb/supervisor"
"github.com/Icinga/icingadb/utils"
@ -66,7 +65,7 @@ func TestOperator_InsertHost(t *testing.T) {
testbackends.RedisTestClient.HSet("icinga:checksum:host", "a9ef44eb69fda8fbc32bee33322b6518057f559f", "{\"checksum\":\"b6e87de3d4f31b3d4d35466171f4088693b46071\"}")
for _, ch := range chs {
ch <- ha.Notify_StartSync
ch <- Notify_StartSync
}
assert.Eventually(t, func() bool {
@ -142,7 +141,7 @@ func TestOperator_DeleteHost(t *testing.T) {
require.NoError(t, err)
for _, ch := range chs {
ch <- ha.Notify_StartSync
ch <- Notify_StartSync
}
assert.Eventually(t, func() bool {
@ -214,7 +213,7 @@ func TestOperator_UpdateHost(t *testing.T) {
require.NoError(t, err)
for _, ch := range chs {
ch <- ha.Notify_StartSync
ch <- Notify_StartSync
}
assert.Eventually(t, func() bool {

View file

@ -0,0 +1,117 @@
package configsync
import (
"github.com/Icinga/icingadb/ha"
"github.com/Icinga/icingadb/supervisor"
"github.com/go-redis/redis/v7"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
const (
Notify_StartSync = iota
Notify_StopSync
)
type ConfigSyncHA struct {
super *supervisor.Supervisor
done chan struct{}
chHA <-chan ha.State
notificationListeners map[string][]chan int
notificationListenersMutex sync.Mutex
lastEventId string
haIsActive bool
}
func NewConfigSyncHA(super *supervisor.Supervisor, chHA <-chan ha.State) *ConfigSyncHA {
return &ConfigSyncHA{
super: super,
done: make(chan struct{}),
chHA: chHA,
notificationListeners: map[string][]chan int{},
lastEventId: "0-0",
}
}
func (h *ConfigSyncHA) Start() {
go h.run()
}
func (h *ConfigSyncHA) run() {
every1s := time.NewTicker(time.Second)
loop:
for {
select {
case <-h.done:
log.Info("received done signal, shutting down")
break loop
case <-every1s.C:
h.runEventListener()
}
}
}
func (h *ConfigSyncHA) Stop() {
close(h.done)
}
func (h *ConfigSyncHA) runEventListener() {
select {
case newState := <-h.chHA:
h.haIsActive = newState == ha.StateActive
if !h.haIsActive {
h.lastEventId = "0-0"
h.notifyNotificationListener("*", Notify_StopSync)
}
default: // don't block if there is no change
}
if !h.haIsActive {
return
}
result := h.super.Rdbw.XRead(&redis.XReadArgs{Block: -1, Streams: []string{"icinga:dump", h.lastEventId}})
streams, err := result.Result()
if err != nil {
if err.Error() != "redis: nil" {
h.super.ChErr <- err
}
return
}
events := streams[0].Messages
if len(events) == 0 {
return
}
for _, event := range events {
h.lastEventId = event.ID
values := event.Values
if values["state"] == "done" {
h.notifyNotificationListener(values["type"].(string), Notify_StartSync)
} else {
h.notifyNotificationListener(values["type"].(string), Notify_StopSync)
}
}
}
func (h *ConfigSyncHA) RegisterNotificationListener(listenerType string) chan int {
ch := make(chan int, 10)
h.notificationListenersMutex.Lock()
h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch)
h.notificationListenersMutex.Unlock()
return ch
}
func (h *ConfigSyncHA) notifyNotificationListener(listenerType string, msg int) {
for t, chs := range h.notificationListeners {
if t == listenerType || listenerType == "*" {
for _, c := range chs {
c <- msg
}
}
}
}

View file

@ -0,0 +1,104 @@
package configsync
import (
"github.com/Icinga/icingadb/config/testbackends"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/ha"
"github.com/Icinga/icingadb/supervisor"
"github.com/go-redis/redis/v7"
"github.com/stretchr/testify/assert"
"sync"
"testing"
)
func GetSuper() *supervisor.Supervisor {
redisConn := connection.NewRDBWrapper(testbackends.RedisTestAddr, "", 64)
return &supervisor.Supervisor{
ChErr: make(chan error),
Rdbw: redisConn,
}
}
func TestHA_NotificationListeners(t *testing.T) {
super := GetSuper()
chHA := make(chan ha.State)
haInst := NewConfigSyncHA(super, chHA)
chHost := haInst.RegisterNotificationListener("host")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
assert.Equal(t, Notify_StartSync, <-chHost)
wg.Done()
}()
haInst.notifyNotificationListener("host", Notify_StartSync)
wg.Wait()
chService := haInst.RegisterNotificationListener("service")
wg.Add(1)
go func() {
assert.Equal(t, Notify_StartSync, <-chService)
wg.Done()
}()
haInst.notifyNotificationListener("service", Notify_StartSync)
wg.Wait()
wg.Add(1)
go func() {
assert.Equal(t, Notify_StartSync, <-chService)
assert.Equal(t, Notify_StartSync, <-chHost)
wg.Done()
}()
haInst.notifyNotificationListener("*", Notify_StartSync)
wg.Wait()
}
func TestHA_EventListener(t *testing.T) {
super := GetSuper()
chHA := make(chan ha.State)
haInst := NewConfigSyncHA(super, chHA)
haInst.Start()
defer haInst.Stop()
chHA <- ha.StateActive
testbackends.RedisTestClient.Del("icinga:dump")
chHost := haInst.RegisterNotificationListener("host")
chService := haInst.RegisterNotificationListener("service")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
assert.Equal(t, Notify_StartSync, <-chHost)
assert.Equal(t, Notify_StopSync, <-chHost)
assert.Equal(t, Notify_StartSync, <-chHost)
assert.Equal(t, Notify_StopSync, <-chHost)
wg.Done()
}()
go func() {
assert.Equal(t, Notify_StartSync, <-chService)
assert.Equal(t, Notify_StopSync, <-chService)
assert.Equal(t, Notify_StartSync, <-chService)
wg.Done()
}()
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}})
wg.Wait()
}

View file

@ -300,12 +300,6 @@ func (dbw *DBWrapper) SqlCommit(tx DbTransaction, quiet bool) error {
}).Debug("COMMIT transaction")
}
if err != nil {
if dbw.isConnectionError(err) {
continue
}
}
return err
}
}
@ -335,12 +329,6 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error {
err = tx.Rollback()
}
if err != nil {
if dbw.isConnectionError(err) {
continue
}
}
return err
}
}
@ -382,7 +370,7 @@ func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryObserver prometh
}
// sqlExecInternal is a wrapper around sql.Exec() for auto-logging.
func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, sql string, quiet bool, args ...interface{}) (sql.Result, error) {
func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, query string, quiet bool, args ...interface{}) (sql.Result, error) {
for {
if !dbw.IsConnected() {
dbw.WaitForConnection()
@ -394,7 +382,7 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prome
benchmarc = utils.NewBenchmark()
}
res, err := db.Exec(sql, args...)
res, err := db.Exec(query, args...)
DbOperationsExec.Inc()
if !quiet {
@ -408,12 +396,12 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prome
"benchmark": benchmarc,
"affected_rows": prettyPrintedRowsAffected{res},
"args": prettyPrintedArgs{args},
"query": prettyPrintedSql{sql},
"query": prettyPrintedSql{query},
}).Debug("Finished Exec")
}
if err != nil {
if dbw.isConnectionError(err) {
if _, isTx := db.(DbTransaction); !isTx && dbw.isConnectionError(err) {
continue
}
}
@ -433,10 +421,8 @@ func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryObserve
res, err := sqlTryFetchAll(db, queryObserver, query, quiet, args...)
if err != nil {
if _, isDb := db.(*sql.DB); isDb {
if dbw.isConnectionError(err) {
continue
}
if _, isTx := db.(DbTransaction); !isTx && dbw.isConnectionError(err) {
continue
}
}

View file

@ -107,36 +107,6 @@ func TestDBWrapper_CheckConnection(t *testing.T) {
assert.Equal(t, uint32(11), atomic.LoadUint32(dbw.ConnectionLostCounterAtomic))
}
func TestDBWrapper_SqlCommit(t *testing.T) {
mockDb := new(DbMock)
dbw := NewTestDBW(mockDb)
mockTx := new(TransactionMock)
mockTx.On("Commit").Return(errors.New("whoops")).Once()
mockTx.On("Commit").Return(nil).Once()
mockDb.On("Ping").Return(errors.New("whoops")).Once()
var err error
done := make(chan bool)
dbw.CompareAndSetConnected(true)
go func() {
err = dbw.SqlCommit(mockTx, false)
done <- true
}()
time.Sleep(time.Millisecond * 50)
dbw.CompareAndSetConnected(true)
dbw.ConnectionUpCondition.Broadcast()
<-done
assert.NoError(t, err)
mockTx.AssertExpectations(t)
mockDb.AssertExpectations(t)
}
func TestDBWrapper_SqlBegin(t *testing.T) {
mockDb := new(DbMock)
dbw := NewTestDBW(mockDb)

480
ha/ha.go
View file

@ -10,7 +10,6 @@ import (
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/supervisor"
"github.com/Icinga/icingadb/utils"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
@ -18,30 +17,30 @@ import (
"time"
)
const (
Notify_StartSync = iota
Notify_StopSync
)
type HA struct {
isActive bool
lastHeartbeat int64
uid uuid.UUID
super *supervisor.Supervisor
notificationListeners map[string][]chan int
notificationListenersMutex sync.Mutex
lastEventId string
logger *log.Entry
heartbeatTimer *time.Timer
state State
stateChangeListeners []chan State
stateChangeListenersMutex sync.Mutex
lastHeartbeat int64
uid uuid.UUID
super *supervisor.Supervisor
logger *log.Entry
dbw *connection.DBWrapper
}
func NewHA(super *supervisor.Supervisor) (*HA, error) {
const (
// We consider heartbeats valid for 10 seconds
heartbeatValidMillisecs = 10 * 1000
// We consider the heartbeat of another instance to be expired 5 seconds after its validity ended
heartbeatTimeoutMillisecs = heartbeatValidMillisecs + 5*1000
)
func NewHA(super *supervisor.Supervisor, dbw *connection.DBWrapper) (*HA, error) {
var err error
ho := HA{
super: super,
notificationListeners: make(map[string][]chan int),
notificationListenersMutex: sync.Mutex{},
lastEventId: "0-0",
super: super,
dbw: dbw,
}
if ho.uid, err = uuid.NewRandom(); err != nil {
@ -52,116 +51,131 @@ func NewHA(super *supervisor.Supervisor) (*HA, error) {
}
var mysqlObservers = struct {
updateIcingadbInstanceById prometheus.Observer
updateIcingadbInstanceByEnvironmentId prometheus.Observer
insertIntoIcingadbInstance prometheus.Observer
insertIntoEnvironment prometheus.Observer
selectIdHeartbeatFromIcingadbInstanceByEnvironmentId prometheus.Observer
updateIcingadbInstanceById prometheus.Observer
updateIcingadbInstanceByEnvironmentId prometheus.Observer
insertIntoIcingadbInstance prometheus.Observer
insertIntoEnvironment prometheus.Observer
selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId prometheus.Observer
selectHeartbeatResponsibleFromIcingadbInstanceById prometheus.Observer
deleteIcingadbInstanceByEndpointId prometheus.Observer
}{
connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"),
connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"),
connection.DbIoSeconds.WithLabelValues("mysql", "insert into icingadb_instance"),
connection.DbIoSeconds.WithLabelValues("mysql", "insert into environment"),
connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat from icingadb_instance where environment_id = ourEnvID"),
connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat, responsible from icingadb_instance where environment_id = ourEnvID"),
connection.DbIoSeconds.WithLabelValues("mysql", "select heartbeat, responsible from icingadb_instance by id"),
connection.DbIoSeconds.WithLabelValues("mysql", "delete from icingadb_instance by endpoint_id"),
}
func (h *HA) updateOwnInstance(env *Environment) error {
_, err := h.super.Dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, heartbeat, responsible,"+
func (h *HA) setState(state State) {
switch state {
// valid new states (no action needed)
case StateActive:
case StateOtherActive:
case StateAllInactive:
case StateInactiveUnkown:
// invalid arguments
case StateInit:
h.logger.Fatal("Must not set HA state to StateInit")
default:
h.logger.Fatalf("Trying to change to invalid HA state %d", state)
}
if state != h.state {
if h.state != StateInit {
h.logger.Infof("Changing HA state to %s (was %s)", state.String(), h.state.String())
} else {
h.logger.Infof("Changing HA state to %s", state.String())
}
h.state = state
h.notifyStateChangeListeners(state)
}
}
func (h *HA) upsertInstance(tx connection.DbTransaction, env *Environment, isActive bool) error {
if isActive {
// If we are active or become active, ensure that no other instance has the active flag set.
_, err := h.dbw.SqlExecTx(tx, mysqlObservers.updateIcingadbInstanceByEnvironmentId,
"UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND responsible = ?",
utils.Bool[false], h.super.EnvId, utils.Bool[true])
if err != nil {
return err
}
}
_, err := h.dbw.SqlExecTx(
tx, mysqlObservers.insertIntoIcingadbInstance,
"REPLACE INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+
" icinga2_active_service_checks_enabled, icinga2_active_host_checks_enabled,"+
" icinga2_event_handlers_enabled, icinga2_flap_detection_enabled,"+
" icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, 'y', ?, ?, ?, ?, ?, ?, ?, ?)",
h.uid[:],
h.super.EnvId,
env.Icinga2.EndpointId,
h.lastHeartbeat,
env.Icinga2.Version,
int64(env.Icinga2.ProgramStart*1000),
utils.Bool[env.Icinga2.NotificationsEnabled],
utils.Bool[env.Icinga2.ActiveServiceChecksEnabled],
utils.Bool[env.Icinga2.ActiveHostChecksEnabled],
utils.Bool[env.Icinga2.EventHandlersEnabled],
utils.Bool[env.Icinga2.FlapDetectionEnabled],
utils.Bool[env.Icinga2.PerformanceDataEnabled],
" icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
h.uid[:], // id
h.super.EnvId, // environment_id
env.Icinga2.EndpointId, // endpoint_id
utils.Bool[isActive], // responsible
h.lastHeartbeat, // heartbeat
env.Icinga2.Version, // icinga2_version
int64(env.Icinga2.ProgramStart*1000), // icinga2_start_time
utils.Bool[env.Icinga2.NotificationsEnabled], // icinga2_notifications_enabled
utils.Bool[env.Icinga2.ActiveServiceChecksEnabled], // icinga2_active_service_checks_enabled
utils.Bool[env.Icinga2.ActiveHostChecksEnabled], // icinga2_active_host_checks_enabled
utils.Bool[env.Icinga2.EventHandlersEnabled], // icinga2_event_handlers_enabled
utils.Bool[env.Icinga2.FlapDetectionEnabled], // icinga2_flap_detection_enabled
utils.Bool[env.Icinga2.PerformanceDataEnabled], // icinga2_performance_data_enabled
)
return err
}
func (h *HA) takeOverInstance(env *Environment) error {
_, err := h.super.Dbw.SqlExec(
mysqlObservers.updateIcingadbInstanceByEnvironmentId,
"UPDATE icingadb_instance SET id = ?, endpoint_id = ?, heartbeat = ?,"+
" icinga2_version = ?, icinga2_start_time = ?, icinga2_notifications_enabled = ?,"+
" icinga2_active_service_checks_enabled = ?, icinga2_active_host_checks_enabled = ?,"+
" icinga2_event_handlers_enabled = ?, icinga2_flap_detection_enabled = ?,"+
" icinga2_performance_data_enabled = ? WHERE environment_id = ?",
h.uid[:],
env.Icinga2.EndpointId,
h.lastHeartbeat,
env.Icinga2.Version,
int64(env.Icinga2.ProgramStart*1000),
utils.Bool[env.Icinga2.NotificationsEnabled],
utils.Bool[env.Icinga2.ActiveServiceChecksEnabled],
utils.Bool[env.Icinga2.ActiveHostChecksEnabled],
utils.Bool[env.Icinga2.EventHandlersEnabled],
utils.Bool[env.Icinga2.FlapDetectionEnabled],
utils.Bool[env.Icinga2.PerformanceDataEnabled],
h.super.EnvId,
func (h *HA) getActiveInstance(tx connection.DbTransaction) (bool, uuid.UUID, error) {
rows, err := h.dbw.SqlFetchAllTx(
tx, mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId,
"SELECT id, heartbeat FROM icingadb_instance"+
" WHERE environment_id = ? AND responsible = ? AND heartbeat > ?",
h.super.EnvId, utils.Bool[true], utils.TimeToMillisecs(time.Now())-heartbeatTimeoutMillisecs,
)
return err
}
func (h *HA) insertInstance(env *Environment) error {
_, err := h.super.Dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, endpoint_id, heartbeat, responsible,"+
" icinga2_version, icinga2_start_time, icinga2_notifications_enabled,"+
" icinga2_active_service_checks_enabled, icinga2_active_host_checks_enabled,"+
" icinga2_event_handlers_enabled, icinga2_flap_detection_enabled,"+
" icinga2_performance_data_enabled) VALUES (?, ?, ?, ?, 'y', ?, ?, ?, ?, ?, ?, ?, ?)",
h.uid[:],
h.super.EnvId,
env.Icinga2.EndpointId,
h.lastHeartbeat,
env.Icinga2.Version,
int64(env.Icinga2.ProgramStart*1000),
utils.Bool[env.Icinga2.NotificationsEnabled],
utils.Bool[env.Icinga2.ActiveServiceChecksEnabled],
utils.Bool[env.Icinga2.ActiveHostChecksEnabled],
utils.Bool[env.Icinga2.EventHandlersEnabled],
utils.Bool[env.Icinga2.FlapDetectionEnabled],
utils.Bool[env.Icinga2.PerformanceDataEnabled],
)
return err
}
func (h *HA) getInstance() (bool, uuid.UUID, int64, error) {
rows, err := h.super.Dbw.SqlFetchAll(
mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId,
"SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1",
h.super.EnvId,
)
if err != nil {
return false, uuid.UUID{}, 0, err
return false, uuid.UUID{}, err
}
if len(rows) > 1 {
return false, uuid.UUID{}, errors.New("there is more than one active IcingaDB instance")
}
if len(rows) == 0 {
return false, uuid.UUID{}, 0, nil
// No active instance according to database.
return false, uuid.UUID{}, nil
}
var theirUUID uuid.UUID
copy(theirUUID[:], rows[0][0].([]byte))
idBytes := rows[0][0].([]byte)
icinga2Heartbeat := rows[0][1].(int64)
return true, theirUUID, rows[0][1].(int64), nil
activeId, err := uuid.FromBytes(idBytes)
if err != nil {
return false, uuid.UUID{}, fmt.Errorf("invalid active UUID in database: %s", err.Error())
}
icinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - icinga2Heartbeat
if activeId == h.uid && icinga2HeartbeatAge > heartbeatValidMillisecs {
// Our heartbeat is too old to be considered valid, no longer consider ourselves to be active.
return false, uuid.UUID{}, nil
} else if activeId != h.uid && icinga2HeartbeatAge > heartbeatTimeoutMillisecs {
// Their heartbeat is old enough to be considered timed out, no longer consider them to be active.
return false, uuid.UUID{}, nil
}
return true, activeId, nil
}
func (h *HA) StartHA(chEnv chan *Environment) {
env := h.waitForEnvironment(chEnv)
h.lastHeartbeat = utils.TimeToMillisecs(time.Now())
err := h.setAndInsertEnvironment(env)
if err != nil {
h.super.ChErr <- fmt.Errorf("Could not insert environment into MySQL: %s", err.Error())
h.super.ChErr <- fmt.Errorf("could not insert environment into MySQL: %s", err.Error())
}
h.logger = log.WithFields(log.Fields{
@ -172,13 +186,7 @@ func (h *HA) StartHA(chEnv chan *Environment) {
h.logger.Info("Got initial environment.")
h.checkResponsibility(env)
h.heartbeatTimer = time.NewTimer(time.Second * 15)
for {
h.runHA(chEnv)
}
h.runHA(chEnv, env)
}
func (h *HA) waitForEnvironment(chEnv chan *Environment) *Environment {
@ -198,7 +206,7 @@ func (h *HA) waitForEnvironment(chEnv chan *Environment) *Environment {
func (h *HA) setAndInsertEnvironment(env *Environment) error {
h.super.EnvId = env.ID
_, err := h.super.Dbw.SqlExec(
_, err := h.dbw.SqlExec(
mysqlObservers.insertIntoEnvironment,
"REPLACE INTO environment(id, name) VALUES (?, ?)",
env.ID, env.Name,
@ -207,151 +215,163 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error {
return err
}
// Remove rows from icingadb_instance that were created by previous startups of this instance.
// A row is considered to be created by this instance if it shares the same environment_id and
// endpoint_id. Rows with a recent heartbeat are never removed.
func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environment) error {
heartbeatTimeoutThreshold := utils.TimeToMillisecs(time.Now()) - heartbeatTimeoutMillisecs
_, err := h.dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId,
"DELETE FROM icingadb_instance "+
"WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?",
h.uid[:], h.super.EnvId, env.Icinga2.EndpointId, heartbeatTimeoutThreshold)
return err
}
func (h *HA) checkResponsibility(env *Environment) {
found, _, beat, err := h.getInstance()
if err != nil {
h.logger.Errorf("Failed to fetch instance: %v", err)
h.super.ChErr <- errors.New("failed to fetch instance")
return
}
if utils.TimeToMillisecs(time.Now())-beat > 15*1000 {
h.logger.Info("Taking over.")
// This means there was no instance row match, insert
if !found {
err = h.insertInstance(env)
} else {
err = h.takeOverInstance(env)
}
start := time.Now()
var newState State
err := h.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
err := h.removePreviousInstances(tx, env)
if err != nil {
h.logger.Errorf("Failed to insert/update instance: %v", err)
h.super.ChErr <- errors.New("failed to insert/update instance")
return
return err
}
h.isActive = true
} else {
h.logger.Info("Other instance is active.")
h.isActive = false
h.lastEventId = "0-0"
}
}
func (h *HA) runHA(chEnv chan *Environment) {
select {
case env := <-chEnv:
if bytes.Compare(env.ID, h.super.EnvId) != 0 {
h.logger.Error("Received environment is not the one we expected. Panic.")
h.super.ChErr <- errors.New("received unexpected environment")
return
foundActive, activeId, err := h.getActiveInstance(tx)
if err != nil {
return err
}
h.heartbeatTimer.Reset(time.Second * 15)
previous := h.lastHeartbeat
h.lastHeartbeat = utils.TimeToMillisecs(time.Now())
lastIcinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - h.lastHeartbeat
lastIcinga2HeartbeatValid := lastIcinga2HeartbeatAge < heartbeatValidMillisecs
if h.lastHeartbeat-previous < 10*1000 && h.isActive {
err := h.updateOwnInstance(env)
if err != nil {
h.logger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
return
if foundActive {
if activeId == h.uid {
if lastIcinga2HeartbeatValid {
// We are active according to the DB and have a valid heartbeat, keep it that way.
newState = StateActive
} else {
// We are active according to the DB but our heartbeat from Icinga 2 is no longer valid.
// Give up active state so that another instance has a chance to take over.
h.logger.Info("Becoming inactive due to expired Icinga 2 heartbeat.")
newState = StateAllInactive
}
} else {
// Some other instance is active, remain passive
newState = StateOtherActive
}
} else {
_, they, beat, err := h.getInstance()
if err != nil {
h.logger.Errorf("Failed to fetch instance: %v", err)
h.super.ChErr <- errors.New("failed to fetch instance")
return
}
if they == h.uid {
h.logger.Debug("We are active.")
if !h.isActive {
h.logger.Info("Icinga 2 sent heartbeat. Starting sync")
h.isActive = true
}
if err := h.updateOwnInstance(env); err != nil {
h.logger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
return
}
} else if h.lastHeartbeat-beat > 15*1000 {
h.logger.Info("Taking over.")
if err := h.takeOverInstance(env); err != nil {
h.logger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
}
h.isActive = true
// No instance is currently active. Try take over, but only
// if we are actively receiving heartbeats from Icinga 2.
if lastIcinga2HeartbeatValid {
h.logger.Info("No active instance, trying to take over.")
newState = StateActive
} else {
h.logger.Debug("Other instance is active.")
if h.state == StateActive {
// We are active according to our last state, however when
// reading from the database our heartbeat expired already.
h.logger.Info("Becoming inactive due to expired Icinga 2 heartbeat.")
}
newState = StateAllInactive
}
}
case <-h.heartbeatTimer.C:
h.logger.Info("Icinga 2 sent no heartbeat for 15 seconds. Pausing sync")
h.isActive = false
h.lastEventId = "0-0"
h.notifyNotificationListener("*", Notify_StopSync)
err = h.upsertInstance(tx, env, newState == StateActive)
if err != nil {
return err
}
return nil
})
if err != nil {
// Transaction failed, we are not sure about the current global state.
// In any case, we ensure that we are no longer active.
h.super.ChErr <- errors.New("HA heartbeat failed")
h.logger.Errorf("HA heartbeat failed: %s", err.Error())
newState = StateInactiveUnkown
}
txDuration := time.Since(start)
icinga2HeartbeatAge := utils.TimeToMillisecs(time.Now()) - h.lastHeartbeat
if newState == StateActive && txDuration > heartbeatValidMillisecs*time.Millisecond/2 {
// The SQL transaction is too slow if it takes more than half the heartbeat validity
// period as in this case, we cannot expect to renew the heartbeat in time.
h.logger.Warnf("SQL transaction took %s, too slow to keep our heartbeat alive. "+
"Check the health of your database.", txDuration)
newState = StateAllInactive
} else if newState == StateActive && icinga2HeartbeatAge > heartbeatValidMillisecs {
// If this was a forced periodic update, the heartbeat might also have
// expired during the execution of the SQL transaction.
h.logger.Warnf("Icinga 2 heartbeat expired during SQL transaction")
newState = StateAllInactive
}
h.setState(newState)
}
func (h *HA) StartEventListener() {
every1s := time.NewTicker(time.Second)
func (h *HA) runHA(chEnv chan *Environment, env *Environment) {
// Force regular Icinga DB heartbeat writes to the database even if we receive no heartbeats from
// Icinga 2. Icinga 2 will send an heartbeat every second, so use two seconds here to avoid
// situations like forcing the update right before we receive the next heartbeat from Icinga 2.
const updateTimerDuration = 2 * time.Second
updateTimer := time.NewTimer(updateTimerDuration)
for {
<-every1s.C
h.runEventListener()
var newEnv *Environment
// Selecting from multiple channels does not guarantee which case gets executed if multiple ones
// are ready. However, when both a new environment is available and the update timer expired, we
// want to prefer the new environment. Therefore first try to select only from chEnv and only if
// there is nothing in the channel, i.e. in the default case, select from both channels.
select {
case newEnv = <-chEnv:
default:
select {
case newEnv = <-chEnv:
case <-updateTimer.C:
}
}
if newEnv != nil {
env = newEnv
if bytes.Compare(env.ID, h.super.EnvId) != 0 {
h.logger.Error("Received environment is not the one we expected. Panic.")
h.super.ChErr <- errors.New("received unexpected environment")
return
}
h.lastHeartbeat = utils.TimeToMillisecs(time.Now())
}
updateTimer.Reset(updateTimerDuration)
h.checkResponsibility(env)
}
}
func (h *HA) runEventListener() {
if !h.isActive {
return
}
func (h *HA) RegisterStateChangeListener() <-chan State {
// The channel has a buffer of size so that it can hold the most recent state. If it is full when we try to write,
// we chan just drain it as the element it contains is outdated anyways.
ch := make(chan State, 1)
result := h.super.Rdbw.XRead(&redis.XReadArgs{Block: -1, Streams: []string{"icinga:dump", h.lastEventId}})
streams, err := result.Result()
if err != nil {
if err.Error() != "redis: nil" {
h.super.ChErr <- err
}
return
}
h.stateChangeListenersMutex.Lock()
defer h.stateChangeListenersMutex.Unlock()
events := streams[0].Messages
if len(events) == 0 {
return
}
for _, event := range events {
h.lastEventId = event.ID
values := event.Values
if values["state"] == "done" {
h.notifyNotificationListener(values["type"].(string), Notify_StartSync)
} else {
h.notifyNotificationListener(values["type"].(string), Notify_StopSync)
}
}
}
func (h *HA) RegisterNotificationListener(listenerType string) chan int {
ch := make(chan int, 10)
h.notificationListenersMutex.Lock()
h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch)
h.notificationListenersMutex.Unlock()
h.stateChangeListeners = append(h.stateChangeListeners, ch)
return ch
}
func (h *HA) notifyNotificationListener(listenerType string, msg int) {
for t, chs := range h.notificationListeners {
if t == listenerType || listenerType == "*" {
for _, c := range chs {
c <- msg
}
func (h *HA) notifyStateChangeListeners(state State) {
h.stateChangeListenersMutex.Lock()
defer h.stateChangeListenersMutex.Unlock()
for _, ch := range h.stateChangeListeners {
// drain the channel
select {
case <-ch:
default:
}
ch <- state
}
}

View file

@ -4,42 +4,37 @@ package ha
import (
"crypto/sha1"
"encoding/hex"
"github.com/Icinga/icingadb/config/testbackends"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/supervisor"
"github.com/Icinga/icingadb/utils"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"strconv"
"sync"
"testing"
"time"
)
func createTestingHA(t *testing.T, redisAddr string) *HA {
redisConn := connection.NewRDBWrapper(redisAddr, "", 64)
mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50)
if err != nil {
assert.Fail(t, "This test needs a working MySQL connection!")
}
mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 1)
require.NoError(t, err, "This test needs a working MySQL connection!")
super := supervisor.Supervisor{
ChErr: make(chan error),
Rdbw: redisConn,
Dbw: mysqlConn,
}
ha, _ := NewHA(&super)
ha, _ := NewHA(&super, mysqlConn)
hash := sha1.New()
hash.Write([]byte("derp"))
ha.super.EnvId = hash.Sum(nil)
ha.uid = uuid.MustParse("551bc748-94b2-4d27-b6a4-15c52aecfe85")
_, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance")
_, err = ha.dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance")
require.NoError(t, err, "This test needs a working MySQL connection!")
ha.logger = log.WithFields(log.Fields{
@ -50,16 +45,52 @@ func createTestingHA(t *testing.T, redisAddr string) *HA {
return ha
}
func createTestingMultipleHA(t *testing.T, redisAddr string, numInstances int) ([]*HA, <-chan error) {
mysqlConn, err := connection.NewDBWrapper(testbackends.MysqlTestDsn, 50)
require.NoError(t, err, "This test needs a working MySQL connection!")
_, err = mysqlConn.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance")
require.NoError(t, err, "This test needs a working MySQL connection!")
instances := make([]*HA, numInstances)
chErr := make(chan error)
for i := 0; i < numInstances; i++ {
super := supervisor.Supervisor{
ChErr: chErr,
}
ha, _ := NewHA(&super, mysqlConn)
hash := sha1.New()
hash.Write([]byte("derp"))
ha.super.EnvId = hash.Sum(nil)
ha.uid = uuid.NewSHA1(uuid.MustParse("551bc748-94b2-4d27-b6a4-15c52aecfe85"), []byte(strconv.Itoa(i)))
ha.logger = log.WithFields(log.Fields{
"context": "HA-Testing",
"UUID": ha.uid,
})
instances[i] = ha
}
return instances, chErr
}
var mysqlTestObserver = connection.DbIoSeconds.WithLabelValues("mysql", "test")
func TestHA_InsertInstance(t *testing.T) {
func TestHA_UpsertInstance(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
err := ha.insertInstance(&Environment{})
require.NoError(t, err, "insertInstance should not return an error")
err := ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
return ha.upsertInstance(tx, &Environment{}, false)
})
require.NoError(t, err, "transaction running upsertInstance should not return an error")
rows, err := ha.super.Dbw.SqlFetchAll(
mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId,
rows, err := ha.dbw.SqlFetchAll(
mysqlObservers.selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId,
"SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1",
ha.super.EnvId,
)
@ -73,41 +104,139 @@ func TestHA_InsertInstance(t *testing.T) {
assert.Equal(t, ha.uid, theirUUID, "UUID must match")
}
func TestHA_checkResponsibility(t *testing.T) {
func TestHA_checkResponsibility_NoOtherInstance(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
now := utils.TimeToMillisecs(time.Now())
ha.lastHeartbeat = now
ha.checkResponsibility(&Environment{})
assert.Equal(t, true, ha.isActive, "HA should be responsible, if no other instance is active")
assert.Equal(t, StateActive, ha.state, "HA should be active if no other instance exists")
}
_, err := ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance")
require.NoError(t, err, "This test needs a working MySQL connection!")
func TestHA_checkResponsibility_OtherInactiveInstance(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
_, err = ha.super.Dbw.SqlExec(
now := utils.TimeToMillisecs(time.Now())
otherUuid, err := uuid.NewRandom()
assert.NoError(t, err, "UUID generation failed")
_, err = ha.dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible, icinga2_version, icinga2_start_time) VALUES (?, ?, ?, 'y', '', 0)",
ha.uid[:], ha.super.EnvId, 0,
"INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?)",
otherUuid[:], ha.super.EnvId, utils.Bool[false], now, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
ha.isActive = false
ha.lastHeartbeat = now
ha.checkResponsibility(&Environment{})
assert.Equal(t, true, ha.isActive, "HA should be responsible, if another instance was inactive for a long time")
assert.Equal(t, StateActive, ha.state, "HA should be active if there is only an inactive instance")
}
_, err = ha.super.Dbw.SqlExec(mysqlTestObserver, "TRUNCATE TABLE icingadb_instance")
require.NoError(t, err, "This test needs a working MySQL connection!")
func TestHA_checkResponsibility_OtherTimedOutInstance(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
_, err = ha.super.Dbw.SqlExec(
now := utils.TimeToMillisecs(time.Now())
timedOut := now - heartbeatTimeoutMillisecs
otherUuid, err := uuid.NewRandom()
assert.NoError(t, err, "UUID generation failed")
_, err = ha.dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible, icinga2_version, icinga2_start_time) VALUES (?, ?, ?, 'y', '', 0)",
ha.uid[:], ha.super.EnvId, utils.TimeToMillisecs(time.Now()),
"INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?)",
otherUuid[:], ha.super.EnvId, utils.Bool[true], timedOut, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
ha.isActive = false
ha.lastHeartbeat = now
ha.checkResponsibility(&Environment{})
assert.Equal(t, false, ha.isActive, "HA should not be responsible, if another instance is active")
assert.Equal(t, StateActive, ha.state, "HA should be active if another instance is timed out")
}
func TestHA_checkResponsibility_OtherActiveInstance(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
now := utils.TimeToMillisecs(time.Now())
otherUuid, err := uuid.NewRandom()
assert.NoError(t, err, "UUID generation failed")
_, err = ha.dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?)",
otherUuid[:], ha.super.EnvId, utils.Bool[true], now, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
ha.lastHeartbeat = now
ha.checkResponsibility(&Environment{})
assert.Equal(t, StateOtherActive, ha.state, "HA should not be active if another instance is active")
}
func TestHA_checkResponsibility_Concurrent(t *testing.T) {
numAttempts := 10
numConcurrentTakeovers := 2
failed := false
for attempt := 0; !failed && attempt < numAttempts; attempt++ {
wg := sync.WaitGroup{}
wg.Add(numConcurrentTakeovers)
haInstances, chErr := createTestingMultipleHA(t, testbackends.RedisTestAddr, numConcurrentTakeovers)
for _, ha := range haInstances {
ha.lastHeartbeat = utils.TimeToMillisecs(time.Now())
}
for _, ha := range haInstances {
ha := ha
go func() {
defer wg.Done()
ha.checkResponsibility(&Environment{})
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
loop:
for {
select {
case err := <-chErr:
assert.NoError(t, err, "checkResponsibility() should return no error")
if err != nil {
failed = true
}
case <-done:
break loop
}
}
numActive := 0
for _, ha := range haInstances {
if ha.state == StateActive {
numActive++
}
}
assert.Equal(t, 1, numActive, "exactly 1 instance must be active after checkResponsibility() but %d are active", numActive)
if numActive != 1 {
failed = true
}
}
}
func TestHA_waitForEnvironment(t *testing.T) {
@ -156,7 +285,7 @@ func TestHA_setAndInsertEnvironment(t *testing.T) {
err := ha.setAndInsertEnvironment(&env)
require.NoError(t, err, "setAndInsertEnvironment should not return an error")
rows, err := ha.super.Dbw.SqlFetchAll(
rows, err := ha.dbw.SqlFetchAll(
mysqlTestObserver,
"SELECT name from environment where id = ? LIMIT 1",
ha.super.EnvId,
@ -169,7 +298,6 @@ func TestHA_setAndInsertEnvironment(t *testing.T) {
func TestHA_runHA(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
ha.heartbeatTimer = time.NewTimer(10 * time.Second)
chEnv := make(chan *Environment)
@ -199,82 +327,85 @@ func TestHA_runHA(t *testing.T) {
wg.Done()
}()
ha.runHA(chEnv)
ha.runHA(chEnv, &Environment{})
wg.Wait()
}
func TestHA_NotificationListeners(t *testing.T) {
func TestHA_RegisterStateChangeListener(t *testing.T) {
ha := createTestingHA(t, testbackends.RedisTestAddr)
chHost := ha.RegisterNotificationListener("host")
wg := sync.WaitGroup{}
wg.Add(1)
assertNonBlockingState := func(expected State, ch <-chan State, msgAndArgs ...interface{}) {
select {
case actual := <-ch:
assert.Equal(t, expected, actual, msgAndArgs...)
default:
assert.Fail(t, "reading from channel should not block", msgAndArgs)
}
}
go func() {
assert.Equal(t, Notify_StartSync, <-chHost)
wg.Done()
}()
chHA := ha.RegisterStateChangeListener()
ha.notifyNotificationListener("host", Notify_StartSync)
wg.Wait()
ha.lastHeartbeat = utils.TimeToMillisecs(time.Now())
ha.checkResponsibility(&Environment{})
assertNonBlockingState(StateActive, chHA, "HA should send StateActive to state change channel when becoming active")
chService := ha.RegisterNotificationListener("service")
wg.Add(1)
go func() {
assert.Equal(t, Notify_StartSync, <-chService)
wg.Done()
}()
ha.notifyNotificationListener("service", Notify_StartSync)
wg.Wait()
wg.Add(1)
go func() {
assert.Equal(t, Notify_StartSync, <-chService)
assert.Equal(t, Notify_StartSync, <-chHost)
wg.Done()
}()
ha.notifyNotificationListener("*", Notify_StartSync)
wg.Wait()
ha.lastHeartbeat = 0
ha.checkResponsibility(&Environment{})
assertNonBlockingState(StateAllInactive, chHA, "HA should send StateAllInactive to state change channel when becoming inactive")
}
func TestHA_EventListener(t *testing.T) {
func TestHA_removePreviousInstances(t *testing.T) {
env := &Environment{}
env.Name = "test"
env.ID = Sha1bytes([]byte(env.Name))
env.Icinga2.EndpointId = make([]byte, 20)
ha := createTestingHA(t, testbackends.RedisTestAddr)
ha.isActive = true
go ha.StartEventListener()
err := ha.setAndInsertEnvironment(env)
require.NoError(t, err, "setAndInsertEnvironment should not return an error")
testbackends.RedisTestClient.Del("icinga:dump")
err = ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
return ha.upsertInstance(tx, &Environment{}, false)
})
require.NoError(t, err, "upsertInstance() should not return an error")
chHost := ha.RegisterNotificationListener("host")
chService := ha.RegisterNotificationListener("service")
now := utils.TimeToMillisecs(time.Now())
activeUuid, err := uuid.NewRandom()
require.NoError(t, err, "UUID generation failed")
_, err = ha.dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?, ?)",
activeUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], now, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
wg := sync.WaitGroup{}
wg.Add(2)
timedOut := now - heartbeatTimeoutMillisecs
timedOutUuid, err := uuid.NewRandom()
require.NoError(t, err, "UUID generation failed")
_, err = ha.dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?, ?)",
timedOutUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], timedOut, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
go func() {
assert.Equal(t, Notify_StartSync, <-chHost)
assert.Equal(t, Notify_StopSync, <-chHost)
assert.Equal(t, Notify_StartSync, <-chHost)
assert.Equal(t, Notify_StopSync, <-chHost)
wg.Done()
}()
err = ha.dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
return ha.removePreviousInstances(tx, env)
})
assert.NoError(t, err, "removePreviousInstances() should not return an error")
go func() {
assert.Equal(t, Notify_StartSync, <-chService)
assert.Equal(t, Notify_StopSync, <-chService)
assert.Equal(t, Notify_StartSync, <-chService)
wg.Done()
}()
rows, err := ha.dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance")
var instanceIds []string
for _, row := range rows {
instanceIds = append(instanceIds, hex.EncodeToString(row[0].([]byte)))
}
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}})
testbackends.RedisTestClient.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}})
wg.Wait()
assert.Contains(t, instanceIds, hex.EncodeToString(ha.uid[:]), "removePreviousInstance() should not remove its own row")
assert.Contains(t, instanceIds, hex.EncodeToString(activeUuid[:]), "removePreviousInstance() should not remove rows that are not timed out yet")
assert.NotContains(t, instanceIds, hex.EncodeToString(timedOutUuid[:]), "removePreviousInstances() should remove timed out instance")
}

View file

@ -5,12 +5,10 @@ package ha
import (
"crypto/sha1"
"encoding/json"
"fmt"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/utils"
"github.com/go-redis/redis/v7"
log "github.com/sirupsen/logrus"
"time"
)
type Environment struct {
@ -43,7 +41,7 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment
log.Info("Starting heartbeat listener")
xReadArgs := redis.XReadArgs{
Streams: []string{"icinga:stats", fmt.Sprintf("%d-0", utils.TimeToMillisecs(time.Now().Add(-15*time.Second)))},
Streams: []string{"icinga:stats", "$"},
Count: 1,
Block: 0,
}

28
ha/state.go Normal file
View file

@ -0,0 +1,28 @@
package ha
type State uint8
const (
StateInit State = iota // Initial state when starting
StateActive // This instance is active
StateOtherActive // This instance is inactive but there is another one that is active
StateAllInactive // All known instances are inactive (i.e. none receives Icinga 2 heartbeats)
StateInactiveUnkown // This instance is inactive but does not known about the state of other instances
)
func (s State) String() string {
switch s {
case StateInit:
return "init"
case StateActive:
return "active"
case StateOtherActive:
return "inactive (other instance active)"
case StateAllInactive:
return "inactive (all instances inactive)"
case StateInactiveUnkown:
return "inactive (other instances unkown)"
default:
return "(invalid)"
}
}

16
main.go
View file

@ -106,6 +106,7 @@ func main() {
metricsInfo := config.GetMetricsInfo()
redisConn := connection.NewRDBWrapper(redisInfo.Host+":"+redisInfo.Port, redisInfo.Password, redisInfo.PoolSize)
redisConnHa := connection.NewRDBWrapper(redisInfo.Host+":"+redisInfo.Port, redisInfo.Password, 1)
var dbDSN string
if filepath.IsAbs(mysqlInfo.Host) {
@ -118,6 +119,10 @@ func main() {
if err != nil {
log.Fatal(err)
}
mysqlConnHa, err := connection.NewDBWrapper(dbDSN, 1)
if err != nil {
log.Fatal(err)
}
super := supervisor.Supervisor{
ChErr: make(chan error),
@ -129,13 +134,13 @@ func main() {
}
chEnv := make(chan *ha.Environment)
haInstance, err := ha.NewHA(&super)
haInstance, err := ha.NewHA(&super, mysqlConnHa)
if err != nil {
log.Fatal(err)
}
go haInstance.StartHA(chEnv)
go ha.IcingaHeartbeatListener(redisConn, chEnv, super.ChErr)
go ha.IcingaHeartbeatListener(redisConnHa, chEnv, super.ChErr)
go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16)
@ -145,8 +150,6 @@ func main() {
history.StartHistoryWorkers(&super)
go haInstance.StartEventListener()
if metricsInfo.Host != "" {
go prometheus.HandleHttp("["+metricsInfo.Host+"]:"+metricsInfo.Port, super.ChErr)
}
@ -228,9 +231,12 @@ func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) {
&hoststate.ObjectInformation,
}
configSyncHA := configsync.NewConfigSyncHA(super, haInstance.RegisterStateChangeListener())
configSyncHA.Start()
for _, objectInformation := range objectTypes {
go func(information *configobject.ObjectInformation) {
super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(information.NotificationListenerType), information)
super.ChErr <- configsync.Operator(super, configSyncHA.RegisterNotificationListener(information.NotificationListenerType), information)
}(objectInformation)
}
}