Retention: take environment_id into account

This commit ensures that the retention feature only deletes history from the
current environment. Previously, the deletion queries where missing an
appropriate WHERE clause.
This commit is contained in:
Julian Brost 2022-05-10 15:36:16 +02:00
parent 3c07730bb0
commit eccac78ff3
4 changed files with 104 additions and 31 deletions

View file

@ -21,10 +21,11 @@ type CleanupStmt struct {
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string { func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
switch driverName { switch driverName {
case driver.MySQL, "mysql": case driver.MySQL, "mysql":
return fmt.Sprintf(`DELETE FROM %[1]s WHERE %[2]s < :time ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit) return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
case driver.PostgreSQL, "postgres": case driver.PostgreSQL, "postgres":
return fmt.Sprintf(`WITH rows AS ( return fmt.Sprintf(`WITH rows AS (
SELECT %[1]s FROM %[2]s WHERE %[3]s < :time ORDER BY %[3]s LIMIT %[4]d SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
) )
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit) DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
default: default:
@ -34,13 +35,18 @@ DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table,
// CleanupOlderThan deletes all rows with the specified statement that are older than the given time. // CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
// Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted. // Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted.
func (db *DB) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint64, olderThan time.Time) (uint64, error) { func (db *DB) CleanupOlderThan(
ctx context.Context, stmt CleanupStmt, envId types.Binary, count uint64, olderThan time.Time,
) (uint64, error) {
var counter com.Counter var counter com.Counter
defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop() defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()
for { for {
q := db.Rebind(stmt.Build(db.DriverName(), count)) q := db.Rebind(stmt.Build(db.DriverName(), count))
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{types.UnixMilli(olderThan)}) rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
EnvironmentId: envId,
Time: types.UnixMilli(olderThan),
})
if err != nil { if err != nil {
return 0, internal.CantPerformQuery(err, q) return 0, internal.CantPerformQuery(err, q)
} }
@ -61,5 +67,6 @@ func (db *DB) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint
} }
type cleanupWhere struct { type cleanupWhere struct {
Time types.UnixMilli EnvironmentId types.Binary
Time types.UnixMilli
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -145,6 +146,11 @@ func (r *Retention) Start(ctx context.Context) error {
ctx, cancelCtx := context.WithCancel(ctx) ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx() defer cancelCtx()
e, ok := v1.EnvironmentFromContext(ctx)
if !ok {
return errors.New("can't get environment from context")
}
errs := make(chan error, 1) errs := make(chan error, 1)
for _, stmt := range RetentionStatements { for _, stmt := range RetentionStatements {
@ -179,7 +185,7 @@ func (r *Retention) Start(ctx context.Context) error {
r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s", r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
stmt.Category, stmt.Table, olderThan) stmt.Category, stmt.Table, olderThan)
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, r.count, olderThan) deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan)
if err != nil { if err != nil {
select { select {
case errs <- err: case errs <- err:

View file

@ -6,6 +6,7 @@ import (
"github.com/goccy/go-yaml" "github.com/goccy/go-yaml"
"github.com/icinga/icinga-testing/services" "github.com/icinga/icinga-testing/services"
"github.com/icinga/icinga-testing/utils/eventually" "github.com/icinga/icinga-testing/utils/eventually"
"github.com/icinga/icingadb/tests/internal/utils"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -14,9 +15,12 @@ import (
"time" "time"
) )
const MaxPlaceholders = 1 << 13
func TestCleanupAndRetention(t *testing.T) { func TestCleanupAndRetention(t *testing.T) {
r := it.RedisServerT(t)
i := it.Icinga2NodeT(t, "master")
i.EnableIcingaDb(r)
i.Reload()
rdb := getDatabase(t) rdb := getDatabase(t)
db, err := sqlx.Open(rdb.Driver(), rdb.DSN()) db, err := sqlx.Open(rdb.Driver(), rdb.DSN())
require.NoError(t, err, "connecting to SQL database shouldn't fail") require.NoError(t, err, "connecting to SQL database shouldn't fail")
@ -43,44 +47,52 @@ func TestCleanupAndRetention(t *testing.T) {
} }
} }
envId := utils.GetEnvironmentIdFromRedis(t, r)
otherEnvId := append([]byte(nil), envId...)
otherEnvId[0]++
rowsToDelete := 10000 rowsToDelete := 10000
rowsToSpare := 1000 rowsToSpare := 1000
rowsInOtherEnv := 1000
for category, stmt := range retentionStatements { for category, stmt := range retentionStatements {
err := dropNotNullColumns(db, stmt) err := dropNotNullColumns(db, stmt)
assert.NoError(t, err) assert.NoError(t, err)
retentionDays := daysForCategory(category) retentionDays := daysForCategory(category)
start := time.Now().AddDate(0, 0, -retentionDays).Add(-1 * time.Millisecond * time.Duration(rowsToDelete)) start := time.Now().AddDate(0, 0, -retentionDays)
startMilli := start.UnixMilli() startMilli := start.UnixMilli()
type row struct { type row struct {
Env []byte
Id []byte Id []byte
Time int64 Time int64
} }
values := make([]row, 0, MaxPlaceholders)
for j := 0; j < (rowsToDelete + rowsToSpare); j++ {
if j == rowsToDelete {
startMilli += int64(2 * time.Minute)
}
nextId := 1
getId := func() []byte {
id := make([]byte, 20) id := make([]byte, 20)
binary.LittleEndian.PutUint64(id, uint64(j)) binary.LittleEndian.PutUint64(id, uint64(nextId))
nextId++
values = append(values, row{id[:], startMilli + int64(j)}) return id
if len(values) == MaxPlaceholders || j == (rowsToDelete+rowsToSpare-1) {
_, err := db.NamedExec(fmt.Sprintf(`INSERT INTO %s (%s, %s) VALUES (:id, :time)`, stmt.Table, stmt.PK, stmt.Column), values)
require.NoError(t, err)
values = values[:0]
}
} }
values := make([]row, 0, rowsToDelete+rowsToSpare+rowsInOtherEnv)
for j := 0; j < rowsToDelete; j++ {
values = append(values, row{envId, getId(), startMilli - int64(j)})
}
for j := 0; j < rowsToSpare; j++ {
values = append(values, row{envId, getId(), startMilli + (2 * time.Minute).Milliseconds() + int64(j)})
}
for j := 0; j < rowsInOtherEnv; j++ {
values = append(values, row{otherEnvId, getId(), startMilli - int64(j)})
}
_, err = db.NamedExec(fmt.Sprintf(`INSERT INTO %s (environment_id, %s, %s) VALUES (:env, :id, :time)`,
stmt.Table, stmt.PK, stmt.Column), values)
require.NoError(t, err)
} }
r := it.RedisServerT(t)
i := it.Icinga2NodeT(t, "master")
i.EnableIcingaDb(r)
i.Reload()
waitForDumpDoneSignal(t, r, 20*time.Second, 100*time.Millisecond) waitForDumpDoneSignal(t, r, 20*time.Second, 100*time.Millisecond)
config, err := yaml.Marshal(struct { config, err := yaml.Marshal(struct {
Retention retention `yaml:"retention"` Retention retention `yaml:"retention"`
@ -96,14 +108,16 @@ func TestCleanupAndRetention(t *testing.T) {
var rowsLeft int var rowsLeft int
err := db.QueryRow( err := db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s < ?`, stmt.Table, stmt.Column)), db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s < ?`, stmt.Table, stmt.Column)),
envId,
thresholdMilli, thresholdMilli,
).Scan(&rowsLeft) ).Scan(&rowsLeft)
assert.NoError(t, err) assert.NoError(t, err)
var rowsSpared int var rowsSpared int
err = db.QueryRow( err = db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s >= ?`, stmt.Table, stmt.Column)), db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s >= ?`, stmt.Table, stmt.Column)),
envId,
thresholdMilli, thresholdMilli,
).Scan(&rowsSpared) ).Scan(&rowsSpared)
assert.NoError(t, err) assert.NoError(t, err)
@ -115,6 +129,15 @@ func TestCleanupAndRetention(t *testing.T) {
assert.Equal(t, 0, rowsLeft, "rows left in retention period for %s", category) assert.Equal(t, 0, rowsLeft, "rows left in retention period for %s", category)
assert.Equal(t, rowsToSpare, rowsSpared, "rows spared for %s", category) assert.Equal(t, rowsToSpare, rowsSpared, "rows spared for %s", category)
} }
var rowsSparedOtherEnv int
err = db.QueryRow(
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id <> ?`, stmt.Table)),
envId,
).Scan(&rowsSparedOtherEnv)
assert.NoError(t, err)
assert.Equal(t, rowsInOtherEnv, rowsSparedOtherEnv, "should not delete rows in other environment for %s", category)
} }
}, time.Minute, time.Second) }, time.Minute, time.Second)
} }
@ -189,9 +212,9 @@ func dropNotNullColumns(db *sqlx.DB, stmt cleanupStmt) error {
err := db.Select(&cols, db.Rebind(fmt.Sprintf(` err := db.Select(&cols, db.Rebind(fmt.Sprintf(`
SELECT column_name SELECT column_name
FROM information_schema.columns FROM information_schema.columns
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?) AND is_nullable = ?`, WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?, ?) AND is_nullable = ?`,
schema)), schema)),
stmt.Table, stmt.PK, stmt.Column, "NO") stmt.Table, "environment_id", stmt.PK, stmt.Column, "NO")
if err != nil { if err != nil {
return err return err
} }

View file

@ -0,0 +1,37 @@
package utils
import (
"context"
"encoding/hex"
"encoding/json"
"github.com/go-redis/redis/v8"
"github.com/icinga/icinga-testing/services"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func GetEnvironmentIdFromRedis(t *testing.T, r services.RedisServer) []byte {
conn := r.Open()
defer conn.Close()
heartbeat, err := conn.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{"icinga:stats", "0"},
Count: 1,
Block: 10 * time.Second,
}).Result()
require.NoError(t, err, "reading from icinga:stats failed")
require.NotEmpty(t, heartbeat, "response contains no streams")
require.NotEmpty(t, heartbeat[0].Messages, "response contains no messages")
require.Contains(t, heartbeat[0].Messages[0].Values, "icingadb_environment",
"icinga:stats message misses icingadb_environment")
var envIdHex string
err = json.Unmarshal([]byte(heartbeat[0].Messages[0].Values["icingadb_environment"].(string)), &envIdHex)
require.NoError(t, err, "cannot parse environment ID as a JSON string")
envId, err := hex.DecodeString(envIdHex)
require.NoError(t, err, "environment ID is not a hex string")
return envId
}