icingadb/tests/cleanup_and_retention_test.go
Julian Brost eccac78ff3 Retention: take environment_id into account
This commit ensures that the retention feature only deletes history from the
current environment. Previously, the deletion queries where missing an
appropriate WHERE clause.
2022-05-13 17:19:03 +02:00

228 lines
5.9 KiB
Go

package icingadb_test
import (
"encoding/binary"
"fmt"
"github.com/goccy/go-yaml"
"github.com/icinga/icinga-testing/services"
"github.com/icinga/icinga-testing/utils/eventually"
"github.com/icinga/icingadb/tests/internal/utils"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"strings"
"testing"
"time"
)
func TestCleanupAndRetention(t *testing.T) {
r := it.RedisServerT(t)
i := it.Icinga2NodeT(t, "master")
i.EnableIcingaDb(r)
i.Reload()
rdb := getDatabase(t)
db, err := sqlx.Open(rdb.Driver(), rdb.DSN())
require.NoError(t, err, "connecting to SQL database shouldn't fail")
t.Cleanup(func() { _ = db.Close() })
reten := retention{
HistoryDays: 7,
SlaDays: 30,
Options: map[string]int{
"acknowledgement": 0, // No cleanup.
"comment": 1,
"downtime": 2,
// notification and state default to 7.
},
}
daysForCategory := func(category string) int {
if strings.HasPrefix(category, "sla_") {
return reten.SlaDays
} else if d, ok := reten.Options[category]; ok {
return d
} else {
return reten.HistoryDays
}
}
envId := utils.GetEnvironmentIdFromRedis(t, r)
otherEnvId := append([]byte(nil), envId...)
otherEnvId[0]++
rowsToDelete := 10000
rowsToSpare := 1000
rowsInOtherEnv := 1000
for category, stmt := range retentionStatements {
err := dropNotNullColumns(db, stmt)
assert.NoError(t, err)
retentionDays := daysForCategory(category)
start := time.Now().AddDate(0, 0, -retentionDays)
startMilli := start.UnixMilli()
type row struct {
Env []byte
Id []byte
Time int64
}
nextId := 1
getId := func() []byte {
id := make([]byte, 20)
binary.LittleEndian.PutUint64(id, uint64(nextId))
nextId++
return id
}
values := make([]row, 0, rowsToDelete+rowsToSpare+rowsInOtherEnv)
for j := 0; j < rowsToDelete; j++ {
values = append(values, row{envId, getId(), startMilli - int64(j)})
}
for j := 0; j < rowsToSpare; j++ {
values = append(values, row{envId, getId(), startMilli + (2 * time.Minute).Milliseconds() + int64(j)})
}
for j := 0; j < rowsInOtherEnv; j++ {
values = append(values, row{otherEnvId, getId(), startMilli - int64(j)})
}
_, err = db.NamedExec(fmt.Sprintf(`INSERT INTO %s (environment_id, %s, %s) VALUES (:env, :id, :time)`,
stmt.Table, stmt.PK, stmt.Column), values)
require.NoError(t, err)
}
waitForDumpDoneSignal(t, r, 20*time.Second, 100*time.Millisecond)
config, err := yaml.Marshal(struct {
Retention retention `yaml:"retention"`
}{reten})
assert.NoError(t, err)
it.IcingaDbInstanceT(t, r, rdb, services.WithIcingaDbConfig(string(config)))
eventually.Assert(t, func(t require.TestingT) {
for category, stmt := range retentionStatements {
retentionDays := daysForCategory(category)
threshold := time.Now().AddDate(0, 0, -retentionDays)
thresholdMilli := threshold.UnixMilli()
var rowsLeft int
err := db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s < ?`, stmt.Table, stmt.Column)),
envId,
thresholdMilli,
).Scan(&rowsLeft)
assert.NoError(t, err)
var rowsSpared int
err = db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s >= ?`, stmt.Table, stmt.Column)),
envId,
thresholdMilli,
).Scan(&rowsSpared)
assert.NoError(t, err)
if retentionDays == 0 {
// No cleanup.
assert.Equal(t, rowsToDelete+rowsToSpare, rowsLeft+rowsSpared, "all rows should still be there for %s", category)
} else {
assert.Equal(t, 0, rowsLeft, "rows left in retention period for %s", category)
assert.Equal(t, rowsToSpare, rowsSpared, "rows spared for %s", category)
}
var rowsSparedOtherEnv int
err = db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id <> ?`, stmt.Table)),
envId,
).Scan(&rowsSparedOtherEnv)
assert.NoError(t, err)
assert.Equal(t, rowsInOtherEnv, rowsSparedOtherEnv, "should not delete rows in other environment for %s", category)
}
}, time.Minute, time.Second)
}
type cleanupStmt struct {
Table string
PK string
Column string
}
type retention struct {
HistoryDays int `yaml:"history-days"`
SlaDays int `yaml:"sla-days"`
Options map[string]int `yaml:"options"`
}
var retentionStatements = map[string]cleanupStmt{
"acknowledgement": {
Table: "acknowledgement_history",
PK: "id",
Column: "clear_time",
},
"comment": {
Table: "comment_history",
PK: "comment_id",
Column: "remove_time",
},
"downtime": {
Table: "downtime_history",
PK: "downtime_id",
Column: "end_time",
},
"flapping": {
Table: "flapping_history",
PK: "id",
Column: "end_time",
},
"notification": {
Table: "notification_history",
PK: "id",
Column: "send_time",
},
"state": {
Table: "state_history",
PK: "id",
Column: "event_time",
},
"sla_downtime": {
Table: "sla_history_downtime",
PK: "downtime_id",
Column: "downtime_end",
},
"sla_state": {
Table: "sla_history_state",
PK: "id",
Column: "event_time",
},
}
// dropNotNullColumns drops all columns with a NOT NULL constraint that are not
// relevant to testing to simplify the insertion of test fixtures.
func dropNotNullColumns(db *sqlx.DB, stmt cleanupStmt) error {
var schema string
switch db.DriverName() {
case "mysql":
schema = `SCHEMA()`
case "postgres":
schema = `CURRENT_SCHEMA()`
}
var cols []string
err := db.Select(&cols, db.Rebind(fmt.Sprintf(`
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?, ?) AND is_nullable = ?`,
schema)),
stmt.Table, "environment_id", stmt.PK, stmt.Column, "NO")
if err != nil {
return err
}
for i := range cols {
if _, err := db.Exec(fmt.Sprintf(`ALTER TABLE %s DROP COLUMN %s`, stmt.Table, cols[i])); err != nil {
return err
}
}
return nil
}