Retention: take environment_id into account

This commit ensures that the retention feature only deletes history from the
current environment. Previously, the deletion queries where missing an
appropriate WHERE clause.
This commit is contained in:
Julian Brost 2022-05-10 15:36:16 +02:00
parent 3c07730bb0
commit eccac78ff3
4 changed files with 104 additions and 31 deletions

View file

@ -21,10 +21,11 @@ type CleanupStmt struct {
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
switch driverName {
case driver.MySQL, "mysql":
return fmt.Sprintf(`DELETE FROM %[1]s WHERE %[2]s < :time ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
case driver.PostgreSQL, "postgres":
return fmt.Sprintf(`WITH rows AS (
SELECT %[1]s FROM %[2]s WHERE %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
)
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
default:
@ -34,13 +35,18 @@ DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table,
// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
// Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted.
func (db *DB) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint64, olderThan time.Time) (uint64, error) {
func (db *DB) CleanupOlderThan(
ctx context.Context, stmt CleanupStmt, envId types.Binary, count uint64, olderThan time.Time,
) (uint64, error) {
var counter com.Counter
defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()
for {
q := db.Rebind(stmt.Build(db.DriverName(), count))
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{types.UnixMilli(olderThan)})
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
EnvironmentId: envId,
Time: types.UnixMilli(olderThan),
})
if err != nil {
return 0, internal.CantPerformQuery(err, q)
}
@ -61,5 +67,6 @@ func (db *DB) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint
}
type cleanupWhere struct {
Time types.UnixMilli
EnvironmentId types.Binary
Time types.UnixMilli
}

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
@ -145,6 +146,11 @@ func (r *Retention) Start(ctx context.Context) error {
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()
e, ok := v1.EnvironmentFromContext(ctx)
if !ok {
return errors.New("can't get environment from context")
}
errs := make(chan error, 1)
for _, stmt := range RetentionStatements {
@ -179,7 +185,7 @@ func (r *Retention) Start(ctx context.Context) error {
r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
stmt.Category, stmt.Table, olderThan)
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, r.count, olderThan)
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan)
if err != nil {
select {
case errs <- err:

View file

@ -6,6 +6,7 @@ import (
"github.com/goccy/go-yaml"
"github.com/icinga/icinga-testing/services"
"github.com/icinga/icinga-testing/utils/eventually"
"github.com/icinga/icingadb/tests/internal/utils"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -14,9 +15,12 @@ import (
"time"
)
const MaxPlaceholders = 1 << 13
func TestCleanupAndRetention(t *testing.T) {
r := it.RedisServerT(t)
i := it.Icinga2NodeT(t, "master")
i.EnableIcingaDb(r)
i.Reload()
rdb := getDatabase(t)
db, err := sqlx.Open(rdb.Driver(), rdb.DSN())
require.NoError(t, err, "connecting to SQL database shouldn't fail")
@ -43,44 +47,52 @@ func TestCleanupAndRetention(t *testing.T) {
}
}
envId := utils.GetEnvironmentIdFromRedis(t, r)
otherEnvId := append([]byte(nil), envId...)
otherEnvId[0]++
rowsToDelete := 10000
rowsToSpare := 1000
rowsInOtherEnv := 1000
for category, stmt := range retentionStatements {
err := dropNotNullColumns(db, stmt)
assert.NoError(t, err)
retentionDays := daysForCategory(category)
start := time.Now().AddDate(0, 0, -retentionDays).Add(-1 * time.Millisecond * time.Duration(rowsToDelete))
start := time.Now().AddDate(0, 0, -retentionDays)
startMilli := start.UnixMilli()
type row struct {
Env []byte
Id []byte
Time int64
}
values := make([]row, 0, MaxPlaceholders)
for j := 0; j < (rowsToDelete + rowsToSpare); j++ {
if j == rowsToDelete {
startMilli += int64(2 * time.Minute)
}
nextId := 1
getId := func() []byte {
id := make([]byte, 20)
binary.LittleEndian.PutUint64(id, uint64(j))
values = append(values, row{id[:], startMilli + int64(j)})
if len(values) == MaxPlaceholders || j == (rowsToDelete+rowsToSpare-1) {
_, err := db.NamedExec(fmt.Sprintf(`INSERT INTO %s (%s, %s) VALUES (:id, :time)`, stmt.Table, stmt.PK, stmt.Column), values)
require.NoError(t, err)
values = values[:0]
}
binary.LittleEndian.PutUint64(id, uint64(nextId))
nextId++
return id
}
values := make([]row, 0, rowsToDelete+rowsToSpare+rowsInOtherEnv)
for j := 0; j < rowsToDelete; j++ {
values = append(values, row{envId, getId(), startMilli - int64(j)})
}
for j := 0; j < rowsToSpare; j++ {
values = append(values, row{envId, getId(), startMilli + (2 * time.Minute).Milliseconds() + int64(j)})
}
for j := 0; j < rowsInOtherEnv; j++ {
values = append(values, row{otherEnvId, getId(), startMilli - int64(j)})
}
_, err = db.NamedExec(fmt.Sprintf(`INSERT INTO %s (environment_id, %s, %s) VALUES (:env, :id, :time)`,
stmt.Table, stmt.PK, stmt.Column), values)
require.NoError(t, err)
}
r := it.RedisServerT(t)
i := it.Icinga2NodeT(t, "master")
i.EnableIcingaDb(r)
i.Reload()
waitForDumpDoneSignal(t, r, 20*time.Second, 100*time.Millisecond)
config, err := yaml.Marshal(struct {
Retention retention `yaml:"retention"`
@ -96,14 +108,16 @@ func TestCleanupAndRetention(t *testing.T) {
var rowsLeft int
err := db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s < ?`, stmt.Table, stmt.Column)),
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s < ?`, stmt.Table, stmt.Column)),
envId,
thresholdMilli,
).Scan(&rowsLeft)
assert.NoError(t, err)
var rowsSpared int
err = db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s >= ?`, stmt.Table, stmt.Column)),
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s >= ?`, stmt.Table, stmt.Column)),
envId,
thresholdMilli,
).Scan(&rowsSpared)
assert.NoError(t, err)
@ -115,6 +129,15 @@ func TestCleanupAndRetention(t *testing.T) {
assert.Equal(t, 0, rowsLeft, "rows left in retention period for %s", category)
assert.Equal(t, rowsToSpare, rowsSpared, "rows spared for %s", category)
}
var rowsSparedOtherEnv int
err = db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id <> ?`, stmt.Table)),
envId,
).Scan(&rowsSparedOtherEnv)
assert.NoError(t, err)
assert.Equal(t, rowsInOtherEnv, rowsSparedOtherEnv, "should not delete rows in other environment for %s", category)
}
}, time.Minute, time.Second)
}
@ -189,9 +212,9 @@ func dropNotNullColumns(db *sqlx.DB, stmt cleanupStmt) error {
err := db.Select(&cols, db.Rebind(fmt.Sprintf(`
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?) AND is_nullable = ?`,
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?, ?) AND is_nullable = ?`,
schema)),
stmt.Table, stmt.PK, stmt.Column, "NO")
stmt.Table, "environment_id", stmt.PK, stmt.Column, "NO")
if err != nil {
return err
}

View file

@ -0,0 +1,37 @@
package utils
import (
"context"
"encoding/hex"
"encoding/json"
"github.com/go-redis/redis/v8"
"github.com/icinga/icinga-testing/services"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func GetEnvironmentIdFromRedis(t *testing.T, r services.RedisServer) []byte {
conn := r.Open()
defer conn.Close()
heartbeat, err := conn.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{"icinga:stats", "0"},
Count: 1,
Block: 10 * time.Second,
}).Result()
require.NoError(t, err, "reading from icinga:stats failed")
require.NotEmpty(t, heartbeat, "response contains no streams")
require.NotEmpty(t, heartbeat[0].Messages, "response contains no messages")
require.Contains(t, heartbeat[0].Messages[0].Values, "icingadb_environment",
"icinga:stats message misses icingadb_environment")
var envIdHex string
err = json.Unmarshal([]byte(heartbeat[0].Messages[0].Values["icingadb_environment"].(string)), &envIdHex)
require.NoError(t, err, "cannot parse environment ID as a JSON string")
envId, err := hex.DecodeString(envIdHex)
require.NoError(t, err, "environment ID is not a hex string")
return envId
}