diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go index b5ba7aab..23e27992 100644 --- a/cmd/icingadb-migrate/main.go +++ b/cmd/icingadb-migrate/main.go @@ -8,7 +8,6 @@ import ( "fmt" "github.com/creasty/defaults" "github.com/goccy/go-yaml" - "github.com/icinga/icingadb/internal/config" "github.com/icinga/icingadb/pkg/database" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" @@ -42,11 +41,11 @@ type Flags struct { // Config defines the YAML config structure. type Config struct { IDO struct { - config.Database `yaml:"-,inline"` + database.Config `yaml:"-,inline"` From int32 `yaml:"from"` To int32 `yaml:"to" default:"2147483647"` } `yaml:"ido"` - IcingaDB config.Database `yaml:"icingadb"` + IcingaDB database.Config `yaml:"icingadb"` // Icinga2 specifies information the IDO doesn't provide. Icinga2 struct { // Env specifies the environment ID, hex. @@ -176,7 +175,7 @@ func connectAll(c *Config) (ido, idb *database.DB) { eg, _ := errgroup.WithContext(context.Background()) eg.Go(func() error { - ido = connect("IDO", &c.IDO.Database) + ido = connect("IDO", &c.IDO.Config) return nil }) @@ -190,8 +189,12 @@ func connectAll(c *Config) (ido, idb *database.DB) { } // connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.) -func connect(which string, cfg *config.Database) *database.DB { - db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second)) +func connect(which string, cfg *database.Config) *database.DB { + db, err := database.NewDbFromConfig( + cfg, + logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second), + database.RetryConnectorCallbacks{}, + ) if err != nil { log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) } diff --git a/internal/command/command.go b/internal/command/command.go index 2d296bfc..bd15dbf9 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -6,10 +6,12 @@ import ( "github.com/icinga/icingadb/internal/config" "github.com/icinga/icingadb/pkg/database" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" goflags "github.com/jessevdk/go-flags" "github.com/pkg/errors" "os" + "time" ) // Command provides factories for creating Redis and Database connections from Config. @@ -49,7 +51,14 @@ func New() *Command { // Database creates and returns a new icingadb.DB connection from config.Config. func (c Command) Database(l *logging.Logger) (*database.DB, error) { - return c.Config.Database.Open(l) + return database.NewDbFromConfig(&c.Config.Database, l, database.RetryConnectorCallbacks{ + OnRetryableError: func(_ time.Duration, _ uint64, err, _ error) { + telemetry.UpdateCurrentDbConnErr(err) + }, + OnSuccess: func(_ time.Duration, _ uint64, _ error) { + telemetry.UpdateCurrentDbConnErr(nil) + }, + }) } // Redis creates and returns a new icingaredis.Client connection from config.Config. diff --git a/internal/config/config.go b/internal/config/config.go index 4b193e74..268b9928 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "github.com/creasty/defaults" "github.com/goccy/go-yaml" + "github.com/icinga/icingadb/pkg/database" "github.com/icinga/icingadb/pkg/logging" "github.com/jessevdk/go-flags" "github.com/pkg/errors" @@ -11,10 +12,10 @@ import ( // Config defines Icinga DB config. type Config struct { - Database Database `yaml:"database"` - Redis Redis `yaml:"redis"` - Logging logging.Config `yaml:"logging"` - Retention Retention `yaml:"retention"` + Database database.Config `yaml:"database"` + Redis Redis `yaml:"redis"` + Logging logging.Config `yaml:"logging"` + Retention Retention `yaml:"retention"` } // Validate checks constraints in the supplied configuration and returns an error if they are violated. diff --git a/internal/config/database.go b/internal/config/database.go deleted file mode 100644 index f9edc0eb..00000000 --- a/internal/config/database.go +++ /dev/null @@ -1,221 +0,0 @@ -package config - -import ( - "context" - "database/sql" - "database/sql/driver" - "fmt" - "github.com/go-sql-driver/mysql" - "github.com/icinga/icingadb/pkg/config" - "github.com/icinga/icingadb/pkg/database" - "github.com/icinga/icingadb/pkg/icingaredis/telemetry" - "github.com/icinga/icingadb/pkg/logging" - "github.com/icinga/icingadb/pkg/strcase" - "github.com/icinga/icingadb/pkg/utils" - "github.com/jmoiron/sqlx" - "github.com/jmoiron/sqlx/reflectx" - "github.com/lib/pq" - "github.com/pkg/errors" - "net" - "net/url" - "strconv" - "time" -) - -// Database defines database client configuration. -type Database struct { - Type string `yaml:"type" default:"mysql"` - Host string `yaml:"host"` - Port int `yaml:"port"` - Database string `yaml:"database"` - User string `yaml:"user"` - Password string `yaml:"password"` - TlsOptions config.TLS `yaml:",inline"` - Options database.Options `yaml:"options"` -} - -// Open prepares the DSN string and driver configuration, -// calls sqlx.Open, but returns *icingadb.DB. -func (d *Database) Open(logger *logging.Logger) (*database.DB, error) { - var db *sqlx.DB - - connectorCallbacks := database.RetryConnectorCallbacks{ - OnRetryableError: func(_ time.Duration, _ uint64, err, _ error) { - telemetry.UpdateCurrentDbConnErr(err) - }, - OnSuccess: func(_ time.Duration, _ uint64, _ error) { - telemetry.UpdateCurrentDbConnErr(nil) - }, - } - - switch d.Type { - case "mysql": - config := mysql.NewConfig() - - config.User = d.User - config.Passwd = d.Password - config.Logger = database.MysqlFuncLogger(logger.Debug) - - if utils.IsUnixAddr(d.Host) { - config.Net = "unix" - config.Addr = d.Host - } else { - config.Net = "tcp" - port := d.Port - if port == 0 { - port = 3306 - } - config.Addr = net.JoinHostPort(d.Host, fmt.Sprint(port)) - } - - config.DBName = d.Database - config.Timeout = time.Minute - config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"} - - tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) - if err != nil { - return nil, err - } - - if tlsConfig != nil { - config.TLSConfig = "icingadb" - if err := mysql.RegisterTLSConfig(config.TLSConfig, tlsConfig); err != nil { - return nil, errors.Wrap(err, "can't register TLS config") - } - } - - c, err := mysql.NewConnector(config) - if err != nil { - return nil, errors.Wrap(err, "can't open mysql database") - } - - connectorCallbacks.OnInitConn = func(ctx context.Context, conn driver.Conn) error { - return setGaleraOpts(ctx, conn, int64(d.Options.WsrepSyncWait)) - } - - db = sqlx.NewDb(sql.OpenDB(database.NewConnector(c, logger, connectorCallbacks)), database.MySQL) - case "pgsql": - uri := &url.URL{ - Scheme: "postgres", - User: url.UserPassword(d.User, d.Password), - Path: "/" + url.PathEscape(d.Database), - } - - query := url.Values{ - "connect_timeout": {"60"}, - "binary_parameters": {"yes"}, - - // Host and port can alternatively be specified in the query string. lib/pq can't parse the connection URI - // if a Unix domain socket path is specified in the host part of the URI, therefore always use the query - // string. See also https://github.com/lib/pq/issues/796 - "host": {d.Host}, - } - if d.Port != 0 { - query["port"] = []string{strconv.FormatInt(int64(d.Port), 10)} - } - - if _, err := d.TlsOptions.MakeConfig(d.Host); err != nil { - return nil, err - } - - if d.TlsOptions.Enable { - if d.TlsOptions.Insecure { - query["sslmode"] = []string{"require"} - } else { - query["sslmode"] = []string{"verify-full"} - } - - if d.TlsOptions.Cert != "" { - query["sslcert"] = []string{d.TlsOptions.Cert} - } - - if d.TlsOptions.Key != "" { - query["sslkey"] = []string{d.TlsOptions.Key} - } - - if d.TlsOptions.Ca != "" { - query["sslrootcert"] = []string{d.TlsOptions.Ca} - } - } else { - query["sslmode"] = []string{"disable"} - } - - uri.RawQuery = query.Encode() - - connector, err := pq.NewConnector(uri.String()) - if err != nil { - return nil, errors.Wrap(err, "can't open pgsql database") - } - - db = sqlx.NewDb(sql.OpenDB(database.NewConnector(connector, logger, connectorCallbacks)), database.PostgreSQL) - default: - return nil, unknownDbType(d.Type) - } - - db.SetMaxIdleConns(d.Options.MaxConnections / 3) - db.SetMaxOpenConns(d.Options.MaxConnections) - - db.Mapper = reflectx.NewMapperFunc("db", strcase.Snake) - - return database.NewDb(db, logger, &d.Options), nil -} - -// Validate checks constraints in the supplied database configuration and returns an error if they are violated. -func (d *Database) Validate() error { - switch d.Type { - case "mysql", "pgsql": - default: - return unknownDbType(d.Type) - } - - if d.Host == "" { - return errors.New("database host missing") - } - - if d.User == "" { - return errors.New("database user missing") - } - - if d.Database == "" { - return errors.New("database name missing") - } - - return d.Options.Validate() -} - -func unknownDbType(t string) error { - return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t) -} - -// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed -// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key -// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes, -// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped. -// -// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait -func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { - const galeraOpts = "SET SESSION wsrep_sync_wait=?" - - stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) - if err != nil { - if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable - return nil - } - - return errors.Wrap(err, "cannot prepare "+galeraOpts) - } - // This is just for an unexpected exit and any returned error can safely be ignored and in case - // of the normal function exit, the stmt is closed manually, and its error is handled gracefully. - defer func() { _ = stmt.Close() }() - - _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) - if err != nil { - return errors.Wrap(err, "cannot execute "+galeraOpts) - } - - if err = stmt.Close(); err != nil { - return errors.Wrap(err, "cannot close prepared statement "+galeraOpts) - } - - return nil -} diff --git a/pkg/database/config.go b/pkg/database/config.go new file mode 100644 index 00000000..c27b803f --- /dev/null +++ b/pkg/database/config.go @@ -0,0 +1,45 @@ +package database + +import ( + "github.com/icinga/icingadb/pkg/config" + "github.com/pkg/errors" +) + +// Config defines database client configuration. +type Config struct { + Type string `yaml:"type" default:"mysql"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + User string `yaml:"user"` + Password string `yaml:"password"` + TlsOptions config.TLS `yaml:",inline"` + Options Options `yaml:"options"` +} + +// Validate checks constraints in the supplied database configuration and returns an error if they are violated. +func (c *Config) Validate() error { + switch c.Type { + case "mysql", "pgsql": + default: + return unknownDbType(c.Type) + } + + if c.Host == "" { + return errors.New("database host missing") + } + + if c.User == "" { + return errors.New("database user missing") + } + + if c.Database == "" { + return errors.New("database name missing") + } + + return c.Options.Validate() +} + +func unknownDbType(t string) error { + return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t) +} diff --git a/pkg/database/db.go b/pkg/database/db.go index 79ca40ce..9ae8f0b9 100644 --- a/pkg/database/db.go +++ b/pkg/database/db.go @@ -2,18 +2,28 @@ package database import ( "context" + "database/sql" + "database/sql/driver" "fmt" + "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/retry" + "github.com/icinga/icingadb/pkg/strcase" + "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" + "github.com/jmoiron/sqlx/reflectx" + "github.com/lib/pq" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" + "net" + "net/url" "reflect" + "strconv" "strings" "sync" "time" @@ -80,7 +90,7 @@ func (o *Options) Validate() error { return nil } -// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. +// NewDb returns a new DB wrapper for a pre-existing sqlx.DB. func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { return &DB{ DB: db, @@ -90,6 +100,124 @@ func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { } } +// NewDbFromConfig returns a new DB from Config. +func NewDbFromConfig(c *Config, logger *logging.Logger, connectorCallbacks RetryConnectorCallbacks) (*DB, error) { + var db *sqlx.DB + + switch c.Type { + case "mysql": + config := mysql.NewConfig() + + config.User = c.User + config.Passwd = c.Password + config.Logger = MysqlFuncLogger(logger.Debug) + + if utils.IsUnixAddr(c.Host) { + config.Net = "unix" + config.Addr = c.Host + } else { + config.Net = "tcp" + port := c.Port + if port == 0 { + port = 3306 + } + config.Addr = net.JoinHostPort(c.Host, fmt.Sprint(port)) + } + + config.DBName = c.Database + config.Timeout = time.Minute + config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"} + + tlsConfig, err := c.TlsOptions.MakeConfig(c.Host) + if err != nil { + return nil, err + } + + config.TLS = tlsConfig + + connector, err := mysql.NewConnector(config) + if err != nil { + return nil, errors.Wrap(err, "can't open mysql database") + } + + onInitConn := connectorCallbacks.OnInitConn + connectorCallbacks.OnInitConn = func(ctx context.Context, conn driver.Conn) error { + if onInitConn != nil { + if err := onInitConn(ctx, conn); err != nil { + return err + } + } + + return setGaleraOpts(ctx, conn, int64(c.Options.WsrepSyncWait)) + } + + db = sqlx.NewDb(sql.OpenDB(NewConnector(connector, logger, connectorCallbacks)), MySQL) + case "pgsql": + uri := &url.URL{ + Scheme: "postgres", + User: url.UserPassword(c.User, c.Password), + Path: "/" + url.PathEscape(c.Database), + } + + query := url.Values{ + "connect_timeout": {"60"}, + "binary_parameters": {"yes"}, + + // Host and port can alternatively be specified in the query string. lib/pq can't parse the connection URI + // if a Unix domain socket path is specified in the host part of the URI, therefore always use the query + // string. See also https://github.com/lib/pq/issues/796 + "host": {c.Host}, + } + if c.Port != 0 { + query["port"] = []string{strconv.FormatInt(int64(c.Port), 10)} + } + + if _, err := c.TlsOptions.MakeConfig(c.Host); err != nil { + return nil, err + } + + if c.TlsOptions.Enable { + if c.TlsOptions.Insecure { + query["sslmode"] = []string{"require"} + } else { + query["sslmode"] = []string{"verify-full"} + } + + if c.TlsOptions.Cert != "" { + query["sslcert"] = []string{c.TlsOptions.Cert} + } + + if c.TlsOptions.Key != "" { + query["sslkey"] = []string{c.TlsOptions.Key} + } + + if c.TlsOptions.Ca != "" { + query["sslrootcert"] = []string{c.TlsOptions.Ca} + } + } else { + query["sslmode"] = []string{"disable"} + } + + uri.RawQuery = query.Encode() + + connector, err := pq.NewConnector(uri.String()) + if err != nil { + return nil, errors.Wrap(err, "can't open pgsql database") + } + + db = sqlx.NewDb(sql.OpenDB(NewConnector(connector, logger, connectorCallbacks)), PostgreSQL) + default: + return nil, unknownDbType(c.Type) + } + + db.SetMaxIdleConns(c.Options.MaxConnections / 3) + db.SetMaxOpenConns(c.Options.MaxConnections) + + db.Mapper = reflectx.NewMapperFunc("db", strcase.Snake) + + return NewDb(db, logger, &c.Options), nil +} + // BuildColumns returns all columns of the given struct. func (db *DB) BuildColumns(subject interface{}) []string { fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names @@ -265,7 +393,7 @@ func (db *DB) BulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any], ) error { var counter com.Counter - defer db.log(ctx, query, &counter).Stop() + defer db.Log(ctx, query, &counter).Stop() g, ctx := errgroup.WithContext(ctx) // Use context from group. @@ -333,7 +461,7 @@ func (db *DB) NamedBulkExec( splitPolicyFactory com.BulkChunkSplitPolicyFactory[Entity], onSuccess ...OnSuccess[Entity], ) error { var counter com.Counter - defer db.log(ctx, query, &counter).Stop() + defer db.Log(ctx, query, &counter).Stop() g, ctx := errgroup.WithContext(ctx) bulk := com.Bulk(ctx, arg, count, splitPolicyFactory) @@ -397,7 +525,7 @@ func (db *DB) NamedBulkExecTx( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan Entity, ) error { var counter com.Counter - defer db.log(ctx, query, &counter).Stop() + defer db.Log(ctx, query, &counter).Stop() g, ctx := errgroup.WithContext(ctx) bulk := com.Bulk(ctx, arg, count, com.NeverSplit[Entity]) @@ -480,7 +608,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc EntityFactoryFunc, query g.Go(func() error { var counter com.Counter - defer db.log(ctx, query, &counter).Stop() + defer db.Log(ctx, query, &counter).Stop() defer close(entities) rows, err := db.NamedQueryContext(ctx, query, scope) @@ -652,7 +780,7 @@ func (db *DB) GetDefaultRetrySettings() retry.Settings { } } -func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { +func (db *DB) Log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { if count := counter.Reset(); count > 0 { db.logger.Debugf("Executed %q with %d rows", query, count) diff --git a/pkg/database/utils.go b/pkg/database/utils.go index e9eb681d..04037030 100644 --- a/pkg/database/utils.go +++ b/pkg/database/utils.go @@ -1,6 +1,9 @@ package database import ( + "context" + "database/sql/driver" + "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/strcase" "github.com/icinga/icingadb/pkg/types" @@ -40,6 +43,39 @@ func SplitOnDupId[T IDer]() com.BulkChunkSplitPolicy[T] { } } +// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed +// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key +// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes, +// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped. +// +// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" + + stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) + if err != nil { + if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable + return nil + } + + return errors.Wrap(err, "cannot prepare "+galeraOpts) + } + // This is just for an unexpected exit and any returned error can safely be ignored and in case + // of the normal function exit, the stmt is closed manually, and its error is handled gracefully. + defer func() { _ = stmt.Close() }() + + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + return errors.Wrap(err, "cannot execute "+galeraOpts) + } + + if err = stmt.Close(); err != nil { + return errors.Wrap(err, "cannot close prepared statement "+galeraOpts) + } + + return nil +} + var ( _ com.BulkChunkSplitPolicyFactory[Entity] = SplitOnDupId[Entity] ) diff --git a/pkg/database/cleanup.go b/pkg/icingadb/cleanup.go similarity index 78% rename from pkg/database/cleanup.go rename to pkg/icingadb/cleanup.go index b8690661..50eebf7c 100644 --- a/pkg/database/cleanup.go +++ b/pkg/icingadb/cleanup.go @@ -1,10 +1,11 @@ -package database +package icingadb import ( "context" "fmt" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/database" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/types" "time" @@ -17,34 +18,18 @@ type CleanupStmt struct { Column string } -// Build assembles the cleanup statement for the specified database driver with the given limit. -func (stmt *CleanupStmt) Build(driverName string, limit uint64) string { - switch driverName { - case MySQL: - return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time -ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit) - case PostgreSQL: - return fmt.Sprintf(`WITH rows AS ( -SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d -) -DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit) - default: - panic(fmt.Sprintf("invalid database type %s", driverName)) - } -} - // CleanupOlderThan deletes all rows with the specified statement that are older than the given time. // Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. // Returns the total number of rows deleted. -func (db *DB) CleanupOlderThan( - ctx context.Context, stmt CleanupStmt, envId types.Binary, - count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], +func (stmt *CleanupStmt) CleanupOlderThan( + ctx context.Context, db *database.DB, envId types.Binary, + count uint64, olderThan time.Time, onSuccess ...database.OnSuccess[struct{}], ) (uint64, error) { var counter com.Counter - q := db.Rebind(stmt.Build(db.DriverName(), count)) + q := db.Rebind(stmt.build(db.DriverName(), count)) - defer db.log(ctx, q, &counter).Stop() + defer db.Log(ctx, q, &counter).Stop() for { var rowsDeleted int64 @@ -57,7 +42,7 @@ func (db *DB) CleanupOlderThan( Time: types.UnixMilli(olderThan), }) if err != nil { - return CantPerformQuery(err, q) + return database.CantPerformQuery(err, q) } rowsDeleted, err = rs.RowsAffected() @@ -88,6 +73,22 @@ func (db *DB) CleanupOlderThan( return counter.Total(), nil } +// build assembles the cleanup statement for the specified database driver with the given limit. +func (stmt *CleanupStmt) build(driverName string, limit uint64) string { + switch driverName { + case database.MySQL: + return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time +ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit) + case database.PostgreSQL: + return fmt.Sprintf(`WITH rows AS ( +SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d +) +DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit) + default: + panic(fmt.Sprintf("invalid database type %s", driverName)) + } +} + type cleanupWhere struct { EnvironmentId types.Binary Time types.UnixMilli diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index 90249592..9db0fb45 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/icinga/icingadb/pkg/database" + "github.com/icinga/icingadb/pkg/icingadb" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" @@ -21,7 +22,7 @@ const ( ) type retentionStatement struct { - database.CleanupStmt + icingadb.CleanupStmt RetentionType Category string } @@ -30,7 +31,7 @@ type retentionStatement struct { var RetentionStatements = []retentionStatement{{ RetentionType: RetentionHistory, Category: "acknowledgement", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "acknowledgement_history", PK: "id", Column: "clear_time", @@ -38,7 +39,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionHistory, Category: "comment", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "comment_history", PK: "comment_id", Column: "remove_time", @@ -46,7 +47,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionHistory, Category: "downtime", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "downtime_history", PK: "downtime_id", Column: "end_time", @@ -54,7 +55,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionHistory, Category: "flapping", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "flapping_history", PK: "id", Column: "end_time", @@ -62,7 +63,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionHistory, Category: "notification", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "notification_history", PK: "id", Column: "send_time", @@ -70,7 +71,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionHistory, Category: "state", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "state_history", PK: "id", Column: "event_time", @@ -78,7 +79,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionSla, Category: "sla_downtime", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "sla_history_downtime", PK: "downtime_id", Column: "downtime_end", @@ -86,7 +87,7 @@ var RetentionStatements = []retentionStatement{{ }, { RetentionType: RetentionSla, Category: "sla_state", - CleanupStmt: database.CleanupStmt{ + CleanupStmt: icingadb.CleanupStmt{ Table: "sla_history_state", PK: "id", Column: "event_time", @@ -186,8 +187,8 @@ func (r *Retention) Start(ctx context.Context) error { r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s", stmt.Category, stmt.Table, olderThan) - deleted, err := r.db.CleanupOlderThan( - ctx, stmt.CleanupStmt, e.Id, r.count, olderThan, + deleted, err := stmt.CleanupOlderThan( + ctx, r.db, e.Id, r.count, olderThan, database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), ) if err != nil {