mirror of
https://github.com/Icinga/icingadb.git
synced 2026-02-18 18:18:00 -05:00
Retention: take environment_id into account
This commit ensures that the retention feature only deletes history from the current environment. Previously, the deletion queries where missing an appropriate WHERE clause.
This commit is contained in:
parent
3c07730bb0
commit
eccac78ff3
4 changed files with 104 additions and 31 deletions
|
|
@ -21,10 +21,11 @@ type CleanupStmt struct {
|
|||
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
|
||||
switch driverName {
|
||||
case driver.MySQL, "mysql":
|
||||
return fmt.Sprintf(`DELETE FROM %[1]s WHERE %[2]s < :time ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
|
||||
return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
|
||||
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
|
||||
case driver.PostgreSQL, "postgres":
|
||||
return fmt.Sprintf(`WITH rows AS (
|
||||
SELECT %[1]s FROM %[2]s WHERE %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
|
||||
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
|
||||
)
|
||||
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
|
||||
default:
|
||||
|
|
@ -34,13 +35,18 @@ DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table,
|
|||
|
||||
// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
|
||||
// Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted.
|
||||
func (db *DB) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint64, olderThan time.Time) (uint64, error) {
|
||||
func (db *DB) CleanupOlderThan(
|
||||
ctx context.Context, stmt CleanupStmt, envId types.Binary, count uint64, olderThan time.Time,
|
||||
) (uint64, error) {
|
||||
var counter com.Counter
|
||||
defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()
|
||||
|
||||
for {
|
||||
q := db.Rebind(stmt.Build(db.DriverName(), count))
|
||||
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{types.UnixMilli(olderThan)})
|
||||
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
|
||||
EnvironmentId: envId,
|
||||
Time: types.UnixMilli(olderThan),
|
||||
})
|
||||
if err != nil {
|
||||
return 0, internal.CantPerformQuery(err, q)
|
||||
}
|
||||
|
|
@ -61,5 +67,6 @@ func (db *DB) CleanupOlderThan(ctx context.Context, stmt CleanupStmt, count uint
|
|||
}
|
||||
|
||||
type cleanupWhere struct {
|
||||
Time types.UnixMilli
|
||||
EnvironmentId types.Binary
|
||||
Time types.UnixMilli
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -145,6 +146,11 @@ func (r *Retention) Start(ctx context.Context) error {
|
|||
ctx, cancelCtx := context.WithCancel(ctx)
|
||||
defer cancelCtx()
|
||||
|
||||
e, ok := v1.EnvironmentFromContext(ctx)
|
||||
if !ok {
|
||||
return errors.New("can't get environment from context")
|
||||
}
|
||||
|
||||
errs := make(chan error, 1)
|
||||
|
||||
for _, stmt := range RetentionStatements {
|
||||
|
|
@ -179,7 +185,7 @@ func (r *Retention) Start(ctx context.Context) error {
|
|||
r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
|
||||
stmt.Category, stmt.Table, olderThan)
|
||||
|
||||
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, r.count, olderThan)
|
||||
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan)
|
||||
if err != nil {
|
||||
select {
|
||||
case errs <- err:
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/goccy/go-yaml"
|
||||
"github.com/icinga/icinga-testing/services"
|
||||
"github.com/icinga/icinga-testing/utils/eventually"
|
||||
"github.com/icinga/icingadb/tests/internal/utils"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -14,9 +15,12 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const MaxPlaceholders = 1 << 13
|
||||
|
||||
func TestCleanupAndRetention(t *testing.T) {
|
||||
r := it.RedisServerT(t)
|
||||
i := it.Icinga2NodeT(t, "master")
|
||||
i.EnableIcingaDb(r)
|
||||
i.Reload()
|
||||
|
||||
rdb := getDatabase(t)
|
||||
db, err := sqlx.Open(rdb.Driver(), rdb.DSN())
|
||||
require.NoError(t, err, "connecting to SQL database shouldn't fail")
|
||||
|
|
@ -43,44 +47,52 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
envId := utils.GetEnvironmentIdFromRedis(t, r)
|
||||
otherEnvId := append([]byte(nil), envId...)
|
||||
otherEnvId[0]++
|
||||
|
||||
rowsToDelete := 10000
|
||||
rowsToSpare := 1000
|
||||
rowsInOtherEnv := 1000
|
||||
|
||||
for category, stmt := range retentionStatements {
|
||||
err := dropNotNullColumns(db, stmt)
|
||||
assert.NoError(t, err)
|
||||
|
||||
retentionDays := daysForCategory(category)
|
||||
start := time.Now().AddDate(0, 0, -retentionDays).Add(-1 * time.Millisecond * time.Duration(rowsToDelete))
|
||||
start := time.Now().AddDate(0, 0, -retentionDays)
|
||||
startMilli := start.UnixMilli()
|
||||
|
||||
type row struct {
|
||||
Env []byte
|
||||
Id []byte
|
||||
Time int64
|
||||
}
|
||||
values := make([]row, 0, MaxPlaceholders)
|
||||
for j := 0; j < (rowsToDelete + rowsToSpare); j++ {
|
||||
if j == rowsToDelete {
|
||||
startMilli += int64(2 * time.Minute)
|
||||
}
|
||||
|
||||
nextId := 1
|
||||
getId := func() []byte {
|
||||
id := make([]byte, 20)
|
||||
binary.LittleEndian.PutUint64(id, uint64(j))
|
||||
|
||||
values = append(values, row{id[:], startMilli + int64(j)})
|
||||
|
||||
if len(values) == MaxPlaceholders || j == (rowsToDelete+rowsToSpare-1) {
|
||||
_, err := db.NamedExec(fmt.Sprintf(`INSERT INTO %s (%s, %s) VALUES (:id, :time)`, stmt.Table, stmt.PK, stmt.Column), values)
|
||||
require.NoError(t, err)
|
||||
values = values[:0]
|
||||
}
|
||||
binary.LittleEndian.PutUint64(id, uint64(nextId))
|
||||
nextId++
|
||||
return id
|
||||
}
|
||||
values := make([]row, 0, rowsToDelete+rowsToSpare+rowsInOtherEnv)
|
||||
|
||||
for j := 0; j < rowsToDelete; j++ {
|
||||
values = append(values, row{envId, getId(), startMilli - int64(j)})
|
||||
}
|
||||
for j := 0; j < rowsToSpare; j++ {
|
||||
values = append(values, row{envId, getId(), startMilli + (2 * time.Minute).Milliseconds() + int64(j)})
|
||||
}
|
||||
for j := 0; j < rowsInOtherEnv; j++ {
|
||||
values = append(values, row{otherEnvId, getId(), startMilli - int64(j)})
|
||||
}
|
||||
|
||||
_, err = db.NamedExec(fmt.Sprintf(`INSERT INTO %s (environment_id, %s, %s) VALUES (:env, :id, :time)`,
|
||||
stmt.Table, stmt.PK, stmt.Column), values)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
r := it.RedisServerT(t)
|
||||
i := it.Icinga2NodeT(t, "master")
|
||||
i.EnableIcingaDb(r)
|
||||
i.Reload()
|
||||
waitForDumpDoneSignal(t, r, 20*time.Second, 100*time.Millisecond)
|
||||
config, err := yaml.Marshal(struct {
|
||||
Retention retention `yaml:"retention"`
|
||||
|
|
@ -96,14 +108,16 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
|
||||
var rowsLeft int
|
||||
err := db.QueryRow(
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s < ?`, stmt.Table, stmt.Column)),
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s < ?`, stmt.Table, stmt.Column)),
|
||||
envId,
|
||||
thresholdMilli,
|
||||
).Scan(&rowsLeft)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var rowsSpared int
|
||||
err = db.QueryRow(
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s >= ?`, stmt.Table, stmt.Column)),
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s >= ?`, stmt.Table, stmt.Column)),
|
||||
envId,
|
||||
thresholdMilli,
|
||||
).Scan(&rowsSpared)
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -115,6 +129,15 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
assert.Equal(t, 0, rowsLeft, "rows left in retention period for %s", category)
|
||||
assert.Equal(t, rowsToSpare, rowsSpared, "rows spared for %s", category)
|
||||
}
|
||||
|
||||
var rowsSparedOtherEnv int
|
||||
err = db.QueryRow(
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id <> ?`, stmt.Table)),
|
||||
envId,
|
||||
).Scan(&rowsSparedOtherEnv)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, rowsInOtherEnv, rowsSparedOtherEnv, "should not delete rows in other environment for %s", category)
|
||||
}
|
||||
}, time.Minute, time.Second)
|
||||
}
|
||||
|
|
@ -189,9 +212,9 @@ func dropNotNullColumns(db *sqlx.DB, stmt cleanupStmt) error {
|
|||
err := db.Select(&cols, db.Rebind(fmt.Sprintf(`
|
||||
SELECT column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?) AND is_nullable = ?`,
|
||||
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?, ?) AND is_nullable = ?`,
|
||||
schema)),
|
||||
stmt.Table, stmt.PK, stmt.Column, "NO")
|
||||
stmt.Table, "environment_id", stmt.PK, stmt.Column, "NO")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
37
tests/internal/utils/redis.go
Normal file
37
tests/internal/utils/redis.go
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icinga-testing/services"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func GetEnvironmentIdFromRedis(t *testing.T, r services.RedisServer) []byte {
|
||||
conn := r.Open()
|
||||
defer conn.Close()
|
||||
|
||||
heartbeat, err := conn.XRead(context.Background(), &redis.XReadArgs{
|
||||
Streams: []string{"icinga:stats", "0"},
|
||||
Count: 1,
|
||||
Block: 10 * time.Second,
|
||||
}).Result()
|
||||
require.NoError(t, err, "reading from icinga:stats failed")
|
||||
require.NotEmpty(t, heartbeat, "response contains no streams")
|
||||
require.NotEmpty(t, heartbeat[0].Messages, "response contains no messages")
|
||||
require.Contains(t, heartbeat[0].Messages[0].Values, "icingadb_environment",
|
||||
"icinga:stats message misses icingadb_environment")
|
||||
|
||||
var envIdHex string
|
||||
err = json.Unmarshal([]byte(heartbeat[0].Messages[0].Values["icingadb_environment"].(string)), &envIdHex)
|
||||
require.NoError(t, err, "cannot parse environment ID as a JSON string")
|
||||
|
||||
envId, err := hex.DecodeString(envIdHex)
|
||||
require.NoError(t, err, "environment ID is not a hex string")
|
||||
|
||||
return envId
|
||||
}
|
||||
Loading…
Reference in a new issue