diff --git a/ha/ha.go b/ha/ha.go index 7aee070d..4a4e7a9f 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -55,6 +55,7 @@ var mysqlObservers = struct { insertIntoEnvironment prometheus.Observer selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId prometheus.Observer selectHeartbeatResponsibleFromIcingadbInstanceById prometheus.Observer + deleteIcingadbInstanceByEndpointId prometheus.Observer }{ connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"), connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"), @@ -62,6 +63,7 @@ var mysqlObservers = struct { connection.DbIoSeconds.WithLabelValues("mysql", "insert into environment"), connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat, responsible from icingadb_instance where environment_id = ourEnvID"), connection.DbIoSeconds.WithLabelValues("mysql", "select heartbeat, responsible from icingadb_instance by id"), + connection.DbIoSeconds.WithLabelValues("mysql", "delete from icingadb_instance by endpoint_id"), } func (h *HA) setState(state State) { @@ -211,9 +213,26 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error { return err } +// Remove rows from icingadb_instance that were created by previous startups of this instance. +// A row is considered to be created by this instance if it shares the same environment_id and +// endpoint_id. Rows with a recent heartbeat are never removed. +func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environment) error { + heartbeatTimeoutThreshold := utils.TimeToMillisecs(time.Now()) - heartbeatTimeoutMillisecs + _, err := h.super.Dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId, + "DELETE FROM icingadb_instance "+ + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", + h.uid[:], h.super.EnvId, env.Icinga2.EndpointId, heartbeatTimeoutThreshold) + return err +} + func (h *HA) checkResponsibility(env *Environment) { var newState State err := h.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + err := h.removePreviousInstances(tx, env) + if err != nil { + return err + } + foundActive, activeId, err := h.getActiveInstance(tx) if err != nil { return err diff --git a/ha/ha_test.go b/ha/ha_test.go index fc1e2d37..4e82a190 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -4,6 +4,7 @@ package ha import ( "crypto/sha1" + "encoding/hex" "github.com/Icinga/icingadb/config/testbackends" "github.com/Icinga/icingadb/connection" "github.com/Icinga/icingadb/supervisor" @@ -361,3 +362,58 @@ func TestHA_RegisterStateChangeListener(t *testing.T) { ha.checkResponsibility(&Environment{}) assertNonBlockingState(StateAllInactive, chHA, "HA should send StateAllInactive to state change channel when becoming inactive") } + +func TestHA_removePreviousInstances(t *testing.T) { + env := &Environment{} + env.Name = "test" + env.ID = Sha1bytes([]byte(env.Name)) + env.Icinga2.EndpointId = make([]byte, 20) + + ha := createTestingHA(t, testbackends.RedisTestAddr) + err := ha.setAndInsertEnvironment(env) + require.NoError(t, err, "setAndInsertEnvironment should not return an error") + + err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.upsertInstance(tx, &Environment{}, false) + }) + require.NoError(t, err, "upsertInstance() should not return an error") + + now := utils.TimeToMillisecs(time.Now()) + activeUuid, err := uuid.NewRandom() + require.NoError(t, err, "UUID generation failed") + _, err = ha.super.Dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?, ?)", + activeUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], now, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") + + timedOut := now - heartbeatTimeoutMillisecs + timedOutUuid, err := uuid.NewRandom() + require.NoError(t, err, "UUID generation failed") + _, err = ha.super.Dbw.SqlExec( + mysqlObservers.insertIntoIcingadbInstance, + "INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+ + " icinga2_version, icinga2_start_time)"+ + " VALUES (?, ?, ?, ?, ?, ?, ?)", + timedOutUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], timedOut, "", 0, + ) + require.NoError(t, err, "This test needs a working MySQL connection!") + + err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error { + return ha.removePreviousInstances(tx, env) + }) + assert.NoError(t, err, "removePreviousInstances() should not return an error") + + rows, err := ha.super.Dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance") + var instanceIds []string + for _, row := range rows { + instanceIds = append(instanceIds, hex.EncodeToString(row[0].([]byte))) + } + + assert.Contains(t, instanceIds, hex.EncodeToString(ha.uid[:]), "removePreviousInstance() should not remove its own row") + assert.Contains(t, instanceIds, hex.EncodeToString(activeUuid[:]), "removePreviousInstance() should not remove rows that are not timed out yet") + assert.NotContains(t, instanceIds, hex.EncodeToString(timedOutUuid[:]), "removePreviousInstances() should remove timed out instance") +}