Remove previous rows from icingadb_instance table

To prevent this table from growing indefinitely, this now removes timed
out rows for the same environment_id and endpoint_id.
This commit is contained in:
Julian Brost 2021-01-21 12:23:16 +01:00
parent f3d21d5a95
commit 335ff91fb7
2 changed files with 75 additions and 0 deletions

View file

@ -55,6 +55,7 @@ var mysqlObservers = struct {
insertIntoEnvironment prometheus.Observer
selectIdHeartbeatResponsibleFromIcingadbInstanceByEnvironmentId prometheus.Observer
selectHeartbeatResponsibleFromIcingadbInstanceById prometheus.Observer
deleteIcingadbInstanceByEndpointId prometheus.Observer
}{
connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"),
connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"),
@ -62,6 +63,7 @@ var mysqlObservers = struct {
connection.DbIoSeconds.WithLabelValues("mysql", "insert into environment"),
connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat, responsible from icingadb_instance where environment_id = ourEnvID"),
connection.DbIoSeconds.WithLabelValues("mysql", "select heartbeat, responsible from icingadb_instance by id"),
connection.DbIoSeconds.WithLabelValues("mysql", "delete from icingadb_instance by endpoint_id"),
}
func (h *HA) setState(state State) {
@ -211,9 +213,26 @@ func (h *HA) setAndInsertEnvironment(env *Environment) error {
return err
}
// Remove rows from icingadb_instance that were created by previous startups of this instance.
// A row is considered to be created by this instance if it shares the same environment_id and
// endpoint_id. Rows with a recent heartbeat are never removed.
func (h *HA) removePreviousInstances(tx connection.DbTransaction, env *Environment) error {
heartbeatTimeoutThreshold := utils.TimeToMillisecs(time.Now()) - heartbeatTimeoutMillisecs
_, err := h.super.Dbw.SqlExecTx(tx, mysqlObservers.deleteIcingadbInstanceByEndpointId,
"DELETE FROM icingadb_instance "+
"WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?",
h.uid[:], h.super.EnvId, env.Icinga2.EndpointId, heartbeatTimeoutThreshold)
return err
}
func (h *HA) checkResponsibility(env *Environment) {
var newState State
err := h.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
err := h.removePreviousInstances(tx, env)
if err != nil {
return err
}
foundActive, activeId, err := h.getActiveInstance(tx)
if err != nil {
return err

View file

@ -4,6 +4,7 @@ package ha
import (
"crypto/sha1"
"encoding/hex"
"github.com/Icinga/icingadb/config/testbackends"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/supervisor"
@ -361,3 +362,58 @@ func TestHA_RegisterStateChangeListener(t *testing.T) {
ha.checkResponsibility(&Environment{})
assertNonBlockingState(StateAllInactive, chHA, "HA should send StateAllInactive to state change channel when becoming inactive")
}
func TestHA_removePreviousInstances(t *testing.T) {
env := &Environment{}
env.Name = "test"
env.ID = Sha1bytes([]byte(env.Name))
env.Icinga2.EndpointId = make([]byte, 20)
ha := createTestingHA(t, testbackends.RedisTestAddr)
err := ha.setAndInsertEnvironment(env)
require.NoError(t, err, "setAndInsertEnvironment should not return an error")
err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
return ha.upsertInstance(tx, &Environment{}, false)
})
require.NoError(t, err, "upsertInstance() should not return an error")
now := utils.TimeToMillisecs(time.Now())
activeUuid, err := uuid.NewRandom()
require.NoError(t, err, "UUID generation failed")
_, err = ha.super.Dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?, ?)",
activeUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], now, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
timedOut := now - heartbeatTimeoutMillisecs
timedOutUuid, err := uuid.NewRandom()
require.NoError(t, err, "UUID generation failed")
_, err = ha.super.Dbw.SqlExec(
mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, endpoint_id, responsible, heartbeat,"+
" icinga2_version, icinga2_start_time)"+
" VALUES (?, ?, ?, ?, ?, ?, ?)",
timedOutUuid[:], ha.super.EnvId, env.Icinga2.EndpointId, utils.Bool[false], timedOut, "", 0,
)
require.NoError(t, err, "This test needs a working MySQL connection!")
err = ha.super.Dbw.SqlTransaction(true, false, false, func(tx connection.DbTransaction) error {
return ha.removePreviousInstances(tx, env)
})
assert.NoError(t, err, "removePreviousInstances() should not return an error")
rows, err := ha.super.Dbw.SqlFetchAll(mysqlTestObserver, "SELECT id FROM icingadb_instance")
var instanceIds []string
for _, row := range rows {
instanceIds = append(instanceIds, hex.EncodeToString(row[0].([]byte)))
}
assert.Contains(t, instanceIds, hex.EncodeToString(ha.uid[:]), "removePreviousInstance() should not remove its own row")
assert.Contains(t, instanceIds, hex.EncodeToString(activeUuid[:]), "removePreviousInstance() should not remove rows that are not timed out yet")
assert.NotContains(t, instanceIds, hex.EncodeToString(timedOutUuid[:]), "removePreviousInstances() should remove timed out instance")
}