mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Move database related code from internal to database
This commit is contained in:
parent
c41cbf260e
commit
8beb8616ad
9 changed files with 275 additions and 272 deletions
|
|
@ -8,7 +8,6 @@ import (
|
|||
"fmt"
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/goccy/go-yaml"
|
||||
"github.com/icinga/icingadb/internal/config"
|
||||
"github.com/icinga/icingadb/pkg/database"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
|
|
@ -42,11 +41,11 @@ type Flags struct {
|
|||
// Config defines the YAML config structure.
|
||||
type Config struct {
|
||||
IDO struct {
|
||||
config.Database `yaml:"-,inline"`
|
||||
database.Config `yaml:"-,inline"`
|
||||
From int32 `yaml:"from"`
|
||||
To int32 `yaml:"to" default:"2147483647"`
|
||||
} `yaml:"ido"`
|
||||
IcingaDB config.Database `yaml:"icingadb"`
|
||||
IcingaDB database.Config `yaml:"icingadb"`
|
||||
// Icinga2 specifies information the IDO doesn't provide.
|
||||
Icinga2 struct {
|
||||
// Env specifies the environment ID, hex.
|
||||
|
|
@ -176,7 +175,7 @@ func connectAll(c *Config) (ido, idb *database.DB) {
|
|||
eg, _ := errgroup.WithContext(context.Background())
|
||||
|
||||
eg.Go(func() error {
|
||||
ido = connect("IDO", &c.IDO.Database)
|
||||
ido = connect("IDO", &c.IDO.Config)
|
||||
return nil
|
||||
})
|
||||
|
||||
|
|
@ -190,8 +189,12 @@ func connectAll(c *Config) (ido, idb *database.DB) {
|
|||
}
|
||||
|
||||
// connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.)
|
||||
func connect(which string, cfg *config.Database) *database.DB {
|
||||
db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second))
|
||||
func connect(which string, cfg *database.Config) *database.DB {
|
||||
db, err := database.NewDbFromConfig(
|
||||
cfg,
|
||||
logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second),
|
||||
database.RetryConnectorCallbacks{},
|
||||
)
|
||||
if err != nil {
|
||||
log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,10 +6,12 @@ import (
|
|||
"github.com/icinga/icingadb/internal/config"
|
||||
"github.com/icinga/icingadb/pkg/database"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
goflags "github.com/jessevdk/go-flags"
|
||||
"github.com/pkg/errors"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Command provides factories for creating Redis and Database connections from Config.
|
||||
|
|
@ -49,7 +51,14 @@ func New() *Command {
|
|||
|
||||
// Database creates and returns a new icingadb.DB connection from config.Config.
|
||||
func (c Command) Database(l *logging.Logger) (*database.DB, error) {
|
||||
return c.Config.Database.Open(l)
|
||||
return database.NewDbFromConfig(&c.Config.Database, l, database.RetryConnectorCallbacks{
|
||||
OnRetryableError: func(_ time.Duration, _ uint64, err, _ error) {
|
||||
telemetry.UpdateCurrentDbConnErr(err)
|
||||
},
|
||||
OnSuccess: func(_ time.Duration, _ uint64, _ error) {
|
||||
telemetry.UpdateCurrentDbConnErr(nil)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Redis creates and returns a new icingaredis.Client connection from config.Config.
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package config
|
|||
import (
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/goccy/go-yaml"
|
||||
"github.com/icinga/icingadb/pkg/database"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/jessevdk/go-flags"
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -11,10 +12,10 @@ import (
|
|||
|
||||
// Config defines Icinga DB config.
|
||||
type Config struct {
|
||||
Database Database `yaml:"database"`
|
||||
Redis Redis `yaml:"redis"`
|
||||
Logging logging.Config `yaml:"logging"`
|
||||
Retention Retention `yaml:"retention"`
|
||||
Database database.Config `yaml:"database"`
|
||||
Redis Redis `yaml:"redis"`
|
||||
Logging logging.Config `yaml:"logging"`
|
||||
Retention Retention `yaml:"retention"`
|
||||
}
|
||||
|
||||
// Validate checks constraints in the supplied configuration and returns an error if they are violated.
|
||||
|
|
|
|||
|
|
@ -1,221 +0,0 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/icinga/icingadb/pkg/config"
|
||||
"github.com/icinga/icingadb/pkg/database"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/strcase"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/jmoiron/sqlx/reflectx"
|
||||
"github.com/lib/pq"
|
||||
"github.com/pkg/errors"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Database defines database client configuration.
|
||||
type Database struct {
|
||||
Type string `yaml:"type" default:"mysql"`
|
||||
Host string `yaml:"host"`
|
||||
Port int `yaml:"port"`
|
||||
Database string `yaml:"database"`
|
||||
User string `yaml:"user"`
|
||||
Password string `yaml:"password"`
|
||||
TlsOptions config.TLS `yaml:",inline"`
|
||||
Options database.Options `yaml:"options"`
|
||||
}
|
||||
|
||||
// Open prepares the DSN string and driver configuration,
|
||||
// calls sqlx.Open, but returns *icingadb.DB.
|
||||
func (d *Database) Open(logger *logging.Logger) (*database.DB, error) {
|
||||
var db *sqlx.DB
|
||||
|
||||
connectorCallbacks := database.RetryConnectorCallbacks{
|
||||
OnRetryableError: func(_ time.Duration, _ uint64, err, _ error) {
|
||||
telemetry.UpdateCurrentDbConnErr(err)
|
||||
},
|
||||
OnSuccess: func(_ time.Duration, _ uint64, _ error) {
|
||||
telemetry.UpdateCurrentDbConnErr(nil)
|
||||
},
|
||||
}
|
||||
|
||||
switch d.Type {
|
||||
case "mysql":
|
||||
config := mysql.NewConfig()
|
||||
|
||||
config.User = d.User
|
||||
config.Passwd = d.Password
|
||||
config.Logger = database.MysqlFuncLogger(logger.Debug)
|
||||
|
||||
if utils.IsUnixAddr(d.Host) {
|
||||
config.Net = "unix"
|
||||
config.Addr = d.Host
|
||||
} else {
|
||||
config.Net = "tcp"
|
||||
port := d.Port
|
||||
if port == 0 {
|
||||
port = 3306
|
||||
}
|
||||
config.Addr = net.JoinHostPort(d.Host, fmt.Sprint(port))
|
||||
}
|
||||
|
||||
config.DBName = d.Database
|
||||
config.Timeout = time.Minute
|
||||
config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"}
|
||||
|
||||
tlsConfig, err := d.TlsOptions.MakeConfig(d.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tlsConfig != nil {
|
||||
config.TLSConfig = "icingadb"
|
||||
if err := mysql.RegisterTLSConfig(config.TLSConfig, tlsConfig); err != nil {
|
||||
return nil, errors.Wrap(err, "can't register TLS config")
|
||||
}
|
||||
}
|
||||
|
||||
c, err := mysql.NewConnector(config)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't open mysql database")
|
||||
}
|
||||
|
||||
connectorCallbacks.OnInitConn = func(ctx context.Context, conn driver.Conn) error {
|
||||
return setGaleraOpts(ctx, conn, int64(d.Options.WsrepSyncWait))
|
||||
}
|
||||
|
||||
db = sqlx.NewDb(sql.OpenDB(database.NewConnector(c, logger, connectorCallbacks)), database.MySQL)
|
||||
case "pgsql":
|
||||
uri := &url.URL{
|
||||
Scheme: "postgres",
|
||||
User: url.UserPassword(d.User, d.Password),
|
||||
Path: "/" + url.PathEscape(d.Database),
|
||||
}
|
||||
|
||||
query := url.Values{
|
||||
"connect_timeout": {"60"},
|
||||
"binary_parameters": {"yes"},
|
||||
|
||||
// Host and port can alternatively be specified in the query string. lib/pq can't parse the connection URI
|
||||
// if a Unix domain socket path is specified in the host part of the URI, therefore always use the query
|
||||
// string. See also https://github.com/lib/pq/issues/796
|
||||
"host": {d.Host},
|
||||
}
|
||||
if d.Port != 0 {
|
||||
query["port"] = []string{strconv.FormatInt(int64(d.Port), 10)}
|
||||
}
|
||||
|
||||
if _, err := d.TlsOptions.MakeConfig(d.Host); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if d.TlsOptions.Enable {
|
||||
if d.TlsOptions.Insecure {
|
||||
query["sslmode"] = []string{"require"}
|
||||
} else {
|
||||
query["sslmode"] = []string{"verify-full"}
|
||||
}
|
||||
|
||||
if d.TlsOptions.Cert != "" {
|
||||
query["sslcert"] = []string{d.TlsOptions.Cert}
|
||||
}
|
||||
|
||||
if d.TlsOptions.Key != "" {
|
||||
query["sslkey"] = []string{d.TlsOptions.Key}
|
||||
}
|
||||
|
||||
if d.TlsOptions.Ca != "" {
|
||||
query["sslrootcert"] = []string{d.TlsOptions.Ca}
|
||||
}
|
||||
} else {
|
||||
query["sslmode"] = []string{"disable"}
|
||||
}
|
||||
|
||||
uri.RawQuery = query.Encode()
|
||||
|
||||
connector, err := pq.NewConnector(uri.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't open pgsql database")
|
||||
}
|
||||
|
||||
db = sqlx.NewDb(sql.OpenDB(database.NewConnector(connector, logger, connectorCallbacks)), database.PostgreSQL)
|
||||
default:
|
||||
return nil, unknownDbType(d.Type)
|
||||
}
|
||||
|
||||
db.SetMaxIdleConns(d.Options.MaxConnections / 3)
|
||||
db.SetMaxOpenConns(d.Options.MaxConnections)
|
||||
|
||||
db.Mapper = reflectx.NewMapperFunc("db", strcase.Snake)
|
||||
|
||||
return database.NewDb(db, logger, &d.Options), nil
|
||||
}
|
||||
|
||||
// Validate checks constraints in the supplied database configuration and returns an error if they are violated.
|
||||
func (d *Database) Validate() error {
|
||||
switch d.Type {
|
||||
case "mysql", "pgsql":
|
||||
default:
|
||||
return unknownDbType(d.Type)
|
||||
}
|
||||
|
||||
if d.Host == "" {
|
||||
return errors.New("database host missing")
|
||||
}
|
||||
|
||||
if d.User == "" {
|
||||
return errors.New("database user missing")
|
||||
}
|
||||
|
||||
if d.Database == "" {
|
||||
return errors.New("database name missing")
|
||||
}
|
||||
|
||||
return d.Options.Validate()
|
||||
}
|
||||
|
||||
func unknownDbType(t string) error {
|
||||
return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t)
|
||||
}
|
||||
|
||||
// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed
|
||||
// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key
|
||||
// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes,
|
||||
// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped.
|
||||
//
|
||||
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
|
||||
func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error {
|
||||
const galeraOpts = "SET SESSION wsrep_sync_wait=?"
|
||||
|
||||
stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts)
|
||||
if err != nil {
|
||||
if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "cannot prepare "+galeraOpts)
|
||||
}
|
||||
// This is just for an unexpected exit and any returned error can safely be ignored and in case
|
||||
// of the normal function exit, the stmt is closed manually, and its error is handled gracefully.
|
||||
defer func() { _ = stmt.Close() }()
|
||||
|
||||
_, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot execute "+galeraOpts)
|
||||
}
|
||||
|
||||
if err = stmt.Close(); err != nil {
|
||||
return errors.Wrap(err, "cannot close prepared statement "+galeraOpts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
45
pkg/database/config.go
Normal file
45
pkg/database/config.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/config"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Config defines database client configuration.
|
||||
type Config struct {
|
||||
Type string `yaml:"type" default:"mysql"`
|
||||
Host string `yaml:"host"`
|
||||
Port int `yaml:"port"`
|
||||
Database string `yaml:"database"`
|
||||
User string `yaml:"user"`
|
||||
Password string `yaml:"password"`
|
||||
TlsOptions config.TLS `yaml:",inline"`
|
||||
Options Options `yaml:"options"`
|
||||
}
|
||||
|
||||
// Validate checks constraints in the supplied database configuration and returns an error if they are violated.
|
||||
func (c *Config) Validate() error {
|
||||
switch c.Type {
|
||||
case "mysql", "pgsql":
|
||||
default:
|
||||
return unknownDbType(c.Type)
|
||||
}
|
||||
|
||||
if c.Host == "" {
|
||||
return errors.New("database host missing")
|
||||
}
|
||||
|
||||
if c.User == "" {
|
||||
return errors.New("database user missing")
|
||||
}
|
||||
|
||||
if c.Database == "" {
|
||||
return errors.New("database name missing")
|
||||
}
|
||||
|
||||
return c.Options.Validate()
|
||||
}
|
||||
|
||||
func unknownDbType(t string) error {
|
||||
return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t)
|
||||
}
|
||||
|
|
@ -2,18 +2,28 @@ package database
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"github.com/icinga/icingadb/pkg/strcase"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/jmoiron/sqlx/reflectx"
|
||||
"github.com/lib/pq"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -80,7 +90,7 @@ func (o *Options) Validate() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
|
||||
// NewDb returns a new DB wrapper for a pre-existing sqlx.DB.
|
||||
func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
|
||||
return &DB{
|
||||
DB: db,
|
||||
|
|
@ -90,6 +100,124 @@ func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
|
|||
}
|
||||
}
|
||||
|
||||
// NewDbFromConfig returns a new DB from Config.
|
||||
func NewDbFromConfig(c *Config, logger *logging.Logger, connectorCallbacks RetryConnectorCallbacks) (*DB, error) {
|
||||
var db *sqlx.DB
|
||||
|
||||
switch c.Type {
|
||||
case "mysql":
|
||||
config := mysql.NewConfig()
|
||||
|
||||
config.User = c.User
|
||||
config.Passwd = c.Password
|
||||
config.Logger = MysqlFuncLogger(logger.Debug)
|
||||
|
||||
if utils.IsUnixAddr(c.Host) {
|
||||
config.Net = "unix"
|
||||
config.Addr = c.Host
|
||||
} else {
|
||||
config.Net = "tcp"
|
||||
port := c.Port
|
||||
if port == 0 {
|
||||
port = 3306
|
||||
}
|
||||
config.Addr = net.JoinHostPort(c.Host, fmt.Sprint(port))
|
||||
}
|
||||
|
||||
config.DBName = c.Database
|
||||
config.Timeout = time.Minute
|
||||
config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"}
|
||||
|
||||
tlsConfig, err := c.TlsOptions.MakeConfig(c.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config.TLS = tlsConfig
|
||||
|
||||
connector, err := mysql.NewConnector(config)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't open mysql database")
|
||||
}
|
||||
|
||||
onInitConn := connectorCallbacks.OnInitConn
|
||||
connectorCallbacks.OnInitConn = func(ctx context.Context, conn driver.Conn) error {
|
||||
if onInitConn != nil {
|
||||
if err := onInitConn(ctx, conn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return setGaleraOpts(ctx, conn, int64(c.Options.WsrepSyncWait))
|
||||
}
|
||||
|
||||
db = sqlx.NewDb(sql.OpenDB(NewConnector(connector, logger, connectorCallbacks)), MySQL)
|
||||
case "pgsql":
|
||||
uri := &url.URL{
|
||||
Scheme: "postgres",
|
||||
User: url.UserPassword(c.User, c.Password),
|
||||
Path: "/" + url.PathEscape(c.Database),
|
||||
}
|
||||
|
||||
query := url.Values{
|
||||
"connect_timeout": {"60"},
|
||||
"binary_parameters": {"yes"},
|
||||
|
||||
// Host and port can alternatively be specified in the query string. lib/pq can't parse the connection URI
|
||||
// if a Unix domain socket path is specified in the host part of the URI, therefore always use the query
|
||||
// string. See also https://github.com/lib/pq/issues/796
|
||||
"host": {c.Host},
|
||||
}
|
||||
if c.Port != 0 {
|
||||
query["port"] = []string{strconv.FormatInt(int64(c.Port), 10)}
|
||||
}
|
||||
|
||||
if _, err := c.TlsOptions.MakeConfig(c.Host); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c.TlsOptions.Enable {
|
||||
if c.TlsOptions.Insecure {
|
||||
query["sslmode"] = []string{"require"}
|
||||
} else {
|
||||
query["sslmode"] = []string{"verify-full"}
|
||||
}
|
||||
|
||||
if c.TlsOptions.Cert != "" {
|
||||
query["sslcert"] = []string{c.TlsOptions.Cert}
|
||||
}
|
||||
|
||||
if c.TlsOptions.Key != "" {
|
||||
query["sslkey"] = []string{c.TlsOptions.Key}
|
||||
}
|
||||
|
||||
if c.TlsOptions.Ca != "" {
|
||||
query["sslrootcert"] = []string{c.TlsOptions.Ca}
|
||||
}
|
||||
} else {
|
||||
query["sslmode"] = []string{"disable"}
|
||||
}
|
||||
|
||||
uri.RawQuery = query.Encode()
|
||||
|
||||
connector, err := pq.NewConnector(uri.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't open pgsql database")
|
||||
}
|
||||
|
||||
db = sqlx.NewDb(sql.OpenDB(NewConnector(connector, logger, connectorCallbacks)), PostgreSQL)
|
||||
default:
|
||||
return nil, unknownDbType(c.Type)
|
||||
}
|
||||
|
||||
db.SetMaxIdleConns(c.Options.MaxConnections / 3)
|
||||
db.SetMaxOpenConns(c.Options.MaxConnections)
|
||||
|
||||
db.Mapper = reflectx.NewMapperFunc("db", strcase.Snake)
|
||||
|
||||
return NewDb(db, logger, &c.Options), nil
|
||||
}
|
||||
|
||||
// BuildColumns returns all columns of the given struct.
|
||||
func (db *DB) BuildColumns(subject interface{}) []string {
|
||||
fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names
|
||||
|
|
@ -265,7 +393,7 @@ func (db *DB) BulkExec(
|
|||
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any],
|
||||
) error {
|
||||
var counter com.Counter
|
||||
defer db.log(ctx, query, &counter).Stop()
|
||||
defer db.Log(ctx, query, &counter).Stop()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
// Use context from group.
|
||||
|
|
@ -333,7 +461,7 @@ func (db *DB) NamedBulkExec(
|
|||
splitPolicyFactory com.BulkChunkSplitPolicyFactory[Entity], onSuccess ...OnSuccess[Entity],
|
||||
) error {
|
||||
var counter com.Counter
|
||||
defer db.log(ctx, query, &counter).Stop()
|
||||
defer db.Log(ctx, query, &counter).Stop()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
bulk := com.Bulk(ctx, arg, count, splitPolicyFactory)
|
||||
|
|
@ -397,7 +525,7 @@ func (db *DB) NamedBulkExecTx(
|
|||
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan Entity,
|
||||
) error {
|
||||
var counter com.Counter
|
||||
defer db.log(ctx, query, &counter).Stop()
|
||||
defer db.Log(ctx, query, &counter).Stop()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
bulk := com.Bulk(ctx, arg, count, com.NeverSplit[Entity])
|
||||
|
|
@ -480,7 +608,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc EntityFactoryFunc, query
|
|||
|
||||
g.Go(func() error {
|
||||
var counter com.Counter
|
||||
defer db.log(ctx, query, &counter).Stop()
|
||||
defer db.Log(ctx, query, &counter).Stop()
|
||||
defer close(entities)
|
||||
|
||||
rows, err := db.NamedQueryContext(ctx, query, scope)
|
||||
|
|
@ -652,7 +780,7 @@ func (db *DB) GetDefaultRetrySettings() retry.Settings {
|
|||
}
|
||||
}
|
||||
|
||||
func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper {
|
||||
func (db *DB) Log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper {
|
||||
return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
db.logger.Debugf("Executed %q with %d rows", query, count)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/strcase"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
|
|
@ -40,6 +43,39 @@ func SplitOnDupId[T IDer]() com.BulkChunkSplitPolicy[T] {
|
|||
}
|
||||
}
|
||||
|
||||
// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed
|
||||
// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key
|
||||
// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes,
|
||||
// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped.
|
||||
//
|
||||
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
|
||||
func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error {
|
||||
const galeraOpts = "SET SESSION wsrep_sync_wait=?"
|
||||
|
||||
stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts)
|
||||
if err != nil {
|
||||
if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "cannot prepare "+galeraOpts)
|
||||
}
|
||||
// This is just for an unexpected exit and any returned error can safely be ignored and in case
|
||||
// of the normal function exit, the stmt is closed manually, and its error is handled gracefully.
|
||||
defer func() { _ = stmt.Close() }()
|
||||
|
||||
_, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot execute "+galeraOpts)
|
||||
}
|
||||
|
||||
if err = stmt.Close(); err != nil {
|
||||
return errors.Wrap(err, "cannot close prepared statement "+galeraOpts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
_ com.BulkChunkSplitPolicyFactory[Entity] = SplitOnDupId[Entity]
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
package database
|
||||
package icingadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/database"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"time"
|
||||
|
|
@ -17,34 +18,18 @@ type CleanupStmt struct {
|
|||
Column string
|
||||
}
|
||||
|
||||
// Build assembles the cleanup statement for the specified database driver with the given limit.
|
||||
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
|
||||
switch driverName {
|
||||
case MySQL:
|
||||
return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
|
||||
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
|
||||
case PostgreSQL:
|
||||
return fmt.Sprintf(`WITH rows AS (
|
||||
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
|
||||
)
|
||||
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid database type %s", driverName))
|
||||
}
|
||||
}
|
||||
|
||||
// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
|
||||
// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess.
|
||||
// Returns the total number of rows deleted.
|
||||
func (db *DB) CleanupOlderThan(
|
||||
ctx context.Context, stmt CleanupStmt, envId types.Binary,
|
||||
count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
|
||||
func (stmt *CleanupStmt) CleanupOlderThan(
|
||||
ctx context.Context, db *database.DB, envId types.Binary,
|
||||
count uint64, olderThan time.Time, onSuccess ...database.OnSuccess[struct{}],
|
||||
) (uint64, error) {
|
||||
var counter com.Counter
|
||||
|
||||
q := db.Rebind(stmt.Build(db.DriverName(), count))
|
||||
q := db.Rebind(stmt.build(db.DriverName(), count))
|
||||
|
||||
defer db.log(ctx, q, &counter).Stop()
|
||||
defer db.Log(ctx, q, &counter).Stop()
|
||||
|
||||
for {
|
||||
var rowsDeleted int64
|
||||
|
|
@ -57,7 +42,7 @@ func (db *DB) CleanupOlderThan(
|
|||
Time: types.UnixMilli(olderThan),
|
||||
})
|
||||
if err != nil {
|
||||
return CantPerformQuery(err, q)
|
||||
return database.CantPerformQuery(err, q)
|
||||
}
|
||||
|
||||
rowsDeleted, err = rs.RowsAffected()
|
||||
|
|
@ -88,6 +73,22 @@ func (db *DB) CleanupOlderThan(
|
|||
return counter.Total(), nil
|
||||
}
|
||||
|
||||
// build assembles the cleanup statement for the specified database driver with the given limit.
|
||||
func (stmt *CleanupStmt) build(driverName string, limit uint64) string {
|
||||
switch driverName {
|
||||
case database.MySQL:
|
||||
return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
|
||||
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
|
||||
case database.PostgreSQL:
|
||||
return fmt.Sprintf(`WITH rows AS (
|
||||
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
|
||||
)
|
||||
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid database type %s", driverName))
|
||||
}
|
||||
}
|
||||
|
||||
type cleanupWhere struct {
|
||||
EnvironmentId types.Binary
|
||||
Time types.UnixMilli
|
||||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"github.com/icinga/icingadb/pkg/database"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
|
|
@ -21,7 +22,7 @@ const (
|
|||
)
|
||||
|
||||
type retentionStatement struct {
|
||||
database.CleanupStmt
|
||||
icingadb.CleanupStmt
|
||||
RetentionType
|
||||
Category string
|
||||
}
|
||||
|
|
@ -30,7 +31,7 @@ type retentionStatement struct {
|
|||
var RetentionStatements = []retentionStatement{{
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "acknowledgement",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "acknowledgement_history",
|
||||
PK: "id",
|
||||
Column: "clear_time",
|
||||
|
|
@ -38,7 +39,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "comment",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "comment_history",
|
||||
PK: "comment_id",
|
||||
Column: "remove_time",
|
||||
|
|
@ -46,7 +47,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "downtime",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "downtime_history",
|
||||
PK: "downtime_id",
|
||||
Column: "end_time",
|
||||
|
|
@ -54,7 +55,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "flapping",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "flapping_history",
|
||||
PK: "id",
|
||||
Column: "end_time",
|
||||
|
|
@ -62,7 +63,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "notification",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "notification_history",
|
||||
PK: "id",
|
||||
Column: "send_time",
|
||||
|
|
@ -70,7 +71,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "state",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "state_history",
|
||||
PK: "id",
|
||||
Column: "event_time",
|
||||
|
|
@ -78,7 +79,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionSla,
|
||||
Category: "sla_downtime",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "sla_history_downtime",
|
||||
PK: "downtime_id",
|
||||
Column: "downtime_end",
|
||||
|
|
@ -86,7 +87,7 @@ var RetentionStatements = []retentionStatement{{
|
|||
}, {
|
||||
RetentionType: RetentionSla,
|
||||
Category: "sla_state",
|
||||
CleanupStmt: database.CleanupStmt{
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "sla_history_state",
|
||||
PK: "id",
|
||||
Column: "event_time",
|
||||
|
|
@ -186,8 +187,8 @@ func (r *Retention) Start(ctx context.Context) error {
|
|||
r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
|
||||
stmt.Category, stmt.Table, olderThan)
|
||||
|
||||
deleted, err := r.db.CleanupOlderThan(
|
||||
ctx, stmt.CleanupStmt, e.Id, r.count, olderThan,
|
||||
deleted, err := stmt.CleanupOlderThan(
|
||||
ctx, r.db, e.Id, r.count, olderThan,
|
||||
database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue