Merge pull request #221 from Icinga/feature/postgres-136

Support PostgreSQL
This commit is contained in:
Julian Brost 2022-03-15 14:35:01 +01:00 committed by GitHub
commit 89bcc99071
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 2251 additions and 65 deletions

View file

@ -4,7 +4,7 @@ use warnings;
use strict;
use autodie qw(:all);
if (/^ ?Copyright / || /^All rights reserved\.$/ || /^(?:The )?\S+ License(?: \(.+?\))?$/ || /^$/) {
if (/^ ?(?:\w+ )?Copyright / || /^All rights reserved\.$/ || /^(?:The )?\S+ License(?: \(.+?\))?$/ || /^$/) {
$_ = ""
}

View file

@ -13,9 +13,10 @@
## About
Icinga DB serves as a synchronisation daemon between Icinga 2 (Redis) and Icinga Web 2 (MySQL). It synchronises configuration, state and history of an Icinga 2 environment using checksums.
Icinga DB serves as a synchronisation daemon between Icinga 2 (Redis) and Icinga Web 2 (MySQL/MariaDB/PostgreSQL database).
It synchronises configuration, state and history of an Icinga 2 environment using checksums.
Icinga DB also supports reading from multiple environments and writing into a single MySQL instance.
Icinga DB also supports reading from multiple environments and writing into a single database.
## License

View file

@ -5,6 +5,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal/command"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingadb/history"
"github.com/icinga/icingadb/pkg/icingadb/overdue"
@ -24,10 +25,11 @@ import (
)
const (
ExitSuccess = 0
ExitFailure = 1
expectedRedisSchemaVersion = "4"
expectedDbSchemaVersion = 3
ExitSuccess = 0
ExitFailure = 1
expectedRedisSchemaVersion = "4"
expectedMysqlSchemaVersion = 3
expectedPostgresSchemaVersion = 1
)
func main() {
@ -317,6 +319,14 @@ func run() int {
// checkDbSchema asserts the database schema of the expected version being present.
func checkDbSchema(ctx context.Context, db *icingadb.DB) error {
var expectedDbSchemaVersion uint16
switch db.DriverName() {
case driver.MySQL:
expectedDbSchemaVersion = expectedMysqlSchemaVersion
case driver.PostgreSQL:
expectedDbSchemaVersion = expectedPostgresSchemaVersion
}
var version uint16
err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version)

View file

@ -1,6 +1,7 @@
# This is the configuration file for Icinga DB.
database:
type: mysql
host: localhost
port: 3306
database: icingadb

View file

@ -2,6 +2,7 @@
![Icinga DB Context](images/about/icinga-db-in-icinga-context.png)
Icinga DB serves as a synchronisation daemon between Icinga 2 (Redis) and Icinga Web 2 (MySQL). It synchronises configuration, volatile states and history of an Icinga 2 environment using checksums.
Icinga DB serves as a synchronisation daemon between Icinga 2 (Redis) and Icinga Web 2 (MySQL/MariaDB/PostgreSQL database).
It synchronises configuration, volatile states and history of an Icinga 2 environment using checksums.
Icinga DB also supports reading from multiple environments and writing into a single MySQL instance.
Icinga DB also supports reading from multiple environments and writing into a single database.

View file

@ -3,7 +3,7 @@
## Requirements <a id="installation-requirements"></a>
* Local Redis instance (Will be installed during this documentation)
* MySQL/MariaDB database `icingadb`, user and schema imports (Will be set up during this documentation)
* MySQL/MariaDB/PostgreSQL database `icingadb`, user and schema imports (Will be set up during this documentation)
## Setting up Icinga DB <a id="setting-up-icingadb"></a>
@ -133,7 +133,11 @@ Debian/Ubuntu:
apt-get install icingadb-redis
```
### Setting up the MySQL database <a id="setting-up-mysql-db"></a>
### Setting up the Database <a id="setting-up-db"></a>
A MySQL/MariaDB or PostgreSQL database is required.
#### MySQL/MariaDB <a id="setting-up-mysql-db"></a>
Note that if you're using a version of MySQL < 5.7 or MariaDB < 10.2, the following server options must be set:
@ -159,6 +163,42 @@ After creating the database, you can import the Icinga DB schema using the follo
mysql -u root -p icingadb </usr/share/icingadb/schema/mysql/schema.sql
```
#### PostgreSQL <a id="setting-up-pgsql-db"></a>
Set up a PostgreSQL database for Icinga DB:
```
# su -l postgres
createuser -P icingadb
createdb -E UTF8 --locale en_US.UTF-8 -T template0 -O icingadb icingadb
psql icingadb <<<'CREATE EXTENSION IF NOT EXISTS citext;'
```
The CREATE EXTENSION command requires the postgresql-contrib package.
(On RHEL/CentOS 7: rh-postgresql95-postgresql-contrib)
Edit `pg_hba.conf`, insert the following before everything else:
```
local all icingadb md5
host all icingadb 0.0.0.0/0 md5
host all icingadb ::/0 md5
```
To apply those changes, run `systemctl reload postgresql`.
(On RHEL/CentOS 7 the service is called "rh-postgresql95-postgresql".)
After creating the database you can import the Icinga DB schema using the
following command. Enter the password when asked.
```
psql -U icingadb icingadb < /usr/share/icingadb/schema/pgsql/schema.sql
```
On RHEL/CentOS 7 prefix "createuser", "createdb" and "psql" with
"/opt/rh/rh-postgresql95/root/usr/bin/".
### Running Icinga DB <a id="running-icingadb"></a>
Foreground:

View file

@ -25,6 +25,7 @@ Configuration of the database used by Icinga DB.
Option | Description
-------------------------|-----------------------------------------------
type | **Optional.** Either `mysql` (default) or `pgsql`.
host | **Required.** Database host or absolute Unix socket path.
port | **Required.** Database port.
database | **Required.** Database database.

1
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/google/uuid v1.3.0
github.com/jessevdk/go-flags v1.5.0
github.com/jmoiron/sqlx v1.3.4
github.com/lib/pq v1.10.3
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
github.com/pkg/errors v0.9.1
github.com/ssgreg/journald v1.0.0

3
go.sum
View file

@ -64,8 +64,9 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=

View file

@ -8,6 +8,42 @@ import (
"time"
)
// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles.
// A call takes an item for the current chunk into account.
// Output true indicates that the state machine was reset first and the bulker
// shall finish the current chunk now (not e.g. once $size is reached) without the given item.
type BulkChunkSplitPolicy func(contracts.Entity) bool
type BulkChunkSplitPolicyFactory func() BulkChunkSplitPolicy
// NeverSplit returns a pseudo state machine which never demands splitting.
func NeverSplit() BulkChunkSplitPolicy {
return neverSplit
}
// SplitOnDupId returns a state machine which tracks the inputs' IDs.
// Once an already seen input arrives, it demands splitting.
func SplitOnDupId() BulkChunkSplitPolicy {
seenIds := map[string]struct{}{}
return func(entity contracts.Entity) bool {
id := entity.ID().String()
_, ok := seenIds[id]
if ok {
seenIds = map[string]struct{}{id: {}}
} else {
seenIds[id] = struct{}{}
}
return ok
}
}
func neverSplit(contracts.Entity) bool {
return false
}
// EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel.
type EntityBulker struct {
ch chan []contracts.Entity
@ -16,14 +52,16 @@ type EntityBulker struct {
}
// NewEntityBulker returns a new EntityBulker and starts streaming.
func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker {
func NewEntityBulker(
ctx context.Context, ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory,
) *EntityBulker {
b := &EntityBulker{
ch: make(chan []contracts.Entity),
ctx: ctx,
mu: sync.Mutex{},
}
go b.run(ch, count)
go b.run(ch, count, splitPolicyFactory)
return b
}
@ -33,10 +71,11 @@ func (b *EntityBulker) Bulk() <-chan []contracts.Entity {
return b.ch
}
func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) {
func (b *EntityBulker) run(ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory) {
defer close(b.ch)
bufCh := make(chan contracts.Entity, count)
splitPolicy := splitPolicyFactory()
g, ctx := errgroup.WithContext(b.ctx)
g.Go(func() error {
@ -71,6 +110,15 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) {
break
}
if splitPolicy(v) {
if len(buf) > 0 {
b.ch <- buf
buf = make([]contracts.Entity, 0, count)
}
timeout = time.After(256 * time.Millisecond)
}
buf = append(buf, v)
case <-timeout:
drain = false
@ -82,6 +130,8 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) {
if len(buf) > 0 {
b.ch <- buf
}
splitPolicy = splitPolicyFactory()
}
return nil
@ -93,16 +143,18 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) {
}
// BulkEntities reads all entities from a channel and streams them in chunks into a returned channel.
func BulkEntities(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity {
func BulkEntities(
ctx context.Context, ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory,
) <-chan []contracts.Entity {
if count <= 1 {
return oneEntityBulk(ctx, ch)
}
return NewEntityBulker(ctx, ch, count).Bulk()
return NewEntityBulker(ctx, ch, count, splitPolicyFactory).Bulk()
}
// oneEntityBulk operates just as NewEntityBulker(ctx, ch, 1).Bulk(),
// but without the overhead of the actual bulk creation with a buffer channel and timeout.
// oneEntityBulk operates just as NewEntityBulker(ctx, ch, 1, splitPolicy).Bulk(),
// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy.
func oneEntityBulk(ctx context.Context, ch <-chan contracts.Entity) <-chan []contracts.Entity {
out := make(chan []contracts.Entity)
go func() {
@ -124,3 +176,8 @@ func oneEntityBulk(ctx context.Context, ch <-chan contracts.Entity) <-chan []con
return out
}
var (
_ BulkChunkSplitPolicyFactory = NeverSplit
_ BulkChunkSplitPolicyFactory = SplitOnDupId
)

View file

@ -11,6 +11,8 @@ import (
"github.com/jmoiron/sqlx/reflectx"
"github.com/pkg/errors"
"net"
"net/url"
"strconv"
"sync"
"time"
)
@ -19,6 +21,7 @@ var registerDriverOnce sync.Once
// Database defines database client configuration.
type Database struct {
Type string `yaml:"type" default:"mysql"`
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
@ -35,30 +38,74 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
driver.Register(logger)
})
config := mysql.NewConfig()
var dsn string
switch d.Type {
case "mysql":
config := mysql.NewConfig()
config.User = d.User
config.Passwd = d.Password
config.Net = "tcp"
config.Addr = net.JoinHostPort(d.Host, fmt.Sprint(d.Port))
config.DBName = d.Database
config.Timeout = time.Minute
config.User = d.User
config.Passwd = d.Password
config.Net = "tcp"
config.Addr = net.JoinHostPort(d.Host, fmt.Sprint(d.Port))
config.DBName = d.Database
config.Timeout = time.Minute
config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"}
tlsConfig, err := d.TlsOptions.MakeConfig(config.Addr)
if err != nil {
return nil, err
}
if tlsConfig != nil {
config.TLSConfig = "icingadb"
if err := mysql.RegisterTLSConfig(config.TLSConfig, tlsConfig); err != nil {
return nil, errors.Wrap(err, "can't register TLS config")
tlsConfig, err := d.TlsOptions.MakeConfig(config.Addr)
if err != nil {
return nil, err
}
if tlsConfig != nil {
config.TLSConfig = "icingadb"
if err := mysql.RegisterTLSConfig(config.TLSConfig, tlsConfig); err != nil {
return nil, errors.Wrap(err, "can't register TLS config")
}
}
dsn = config.FormatDSN()
case "pgsql":
uri := &url.URL{
Scheme: "postgres",
User: url.UserPassword(d.User, d.Password),
Host: net.JoinHostPort(d.Host, strconv.FormatInt(int64(d.Port), 10)),
Path: "/" + url.PathEscape(d.Database),
}
if _, err := d.TlsOptions.MakeConfig(uri.Host); err != nil {
return nil, err
}
query := url.Values{"connect_timeout": {"60"}, "binary_parameters": {"yes"}}
if d.TlsOptions.Enable {
if d.TlsOptions.Insecure {
query["sslmode"] = []string{"require"}
} else {
query["sslmode"] = []string{"verify-full"}
}
if d.TlsOptions.Cert != "" {
query["sslcert"] = []string{d.TlsOptions.Cert}
}
if d.TlsOptions.Key != "" {
query["sslkey"] = []string{d.TlsOptions.Key}
}
if d.TlsOptions.Ca != "" {
query["sslrootcert"] = []string{d.TlsOptions.Ca}
}
} else {
query["sslmode"] = []string{"disable"}
}
uri.RawQuery = query.Encode()
dsn = uri.String()
default:
return nil, unknownDbType(d.Type)
}
dsn := config.FormatDSN()
db, err := sqlx.Open("icingadb-mysql", dsn)
db, err := sqlx.Open("icingadb-"+d.Type, dsn)
if err != nil {
return nil, errors.Wrap(err, "can't open database")
}
@ -75,5 +122,15 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
// Validate checks constraints in the supplied database configuration and returns an error if they are violated.
func (d *Database) Validate() error {
switch d.Type {
case "mysql", "pgsql":
default:
return unknownDbType(d.Type)
}
return d.Options.Validate()
}
func unknownDbType(t string) error {
return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t)
}

View file

@ -8,12 +8,16 @@ import (
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/retry"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
"syscall"
"time"
)
const MySQL = "icingadb-mysql"
const PostgreSQL = "icingadb-pgsql"
var timeout = time.Minute * 5
// RetryConnector wraps driver.Connector with retry logic.
@ -75,10 +79,12 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {
}, nil
}
// Register makes our database Driver available under the name "icingadb-mysql".
// Register makes our database Driver available under the name "icingadb-*sql".
func Register(logger *logging.Logger) {
sql.Register("icingadb-mysql", &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger})
sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger})
sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger})
_ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) }))
sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR)
}
// ctxDriver helps ensure that we only support drivers that implement driver.Driver and driver.DriverContext.

22
pkg/driver/pgsql.go Normal file
View file

@ -0,0 +1,22 @@
package driver
import (
"database/sql/driver"
"github.com/lib/pq"
)
// PgSQLDriver extends pq.Driver with driver.DriverContext compliance.
type PgSQLDriver struct {
pq.Driver
}
// Assert interface compliance.
var (
_ driver.Driver = PgSQLDriver{}
_ driver.DriverContext = PgSQLDriver{}
)
// OpenConnector implements the driver.DriverContext interface.
func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error) {
return pq.NewConnector(name)
}

View file

@ -2,18 +2,20 @@ package icingadb
import (
"context"
"database/sql/driver"
sqlDriver "database/sql/driver"
"fmt"
"github.com/go-sql-driver/mysql"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
@ -102,7 +104,7 @@ func (db *DB) BuildColumns(subject interface{}) []string {
// BuildDeleteStmt returns a DELETE statement for the given struct.
func (db *DB) BuildDeleteStmt(from interface{}) string {
return fmt.Sprintf(
`DELETE FROM %s WHERE id IN (?)`,
`DELETE FROM "%s" WHERE id IN (?)`,
utils.TableName(from),
)
}
@ -112,7 +114,7 @@ func (db *DB) BuildInsertStmt(into interface{}) (string, int) {
columns := db.BuildColumns(into)
return fmt.Sprintf(
`INSERT INTO %s (%s) VALUES (%s)`,
`INSERT INTO "%s" (%s) VALUES (%s)`,
utils.TableName(into),
strings.Join(columns, ", "),
fmt.Sprintf(":%s", strings.Join(columns, ", :")),
@ -122,14 +124,24 @@ func (db *DB) BuildInsertStmt(into interface{}) (string, int) {
// BuildInsertIgnoreStmt returns an INSERT statement for the specified struct for
// which the database ignores rows that have already been inserted.
func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) {
table := utils.TableName(into)
columns := db.BuildColumns(into)
var clause string
switch db.DriverName() {
case driver.MySQL:
// MySQL treats UPDATE id = id as a no-op.
clause = "ON DUPLICATE KEY UPDATE id = id"
case driver.PostgreSQL:
clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table)
}
return fmt.Sprintf(
// MySQL treats UPDATE id = id as a no-op.
`INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE id = id`,
utils.TableName(into),
`INSERT INTO "%s" (%s) VALUES (%s) %s`,
table,
strings.Join(columns, ", "),
fmt.Sprintf(":%s", strings.Join(columns, ", :")),
clause,
), len(columns)
}
@ -137,7 +149,7 @@ func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) {
// and the column list from the specified columns struct.
func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string {
q := fmt.Sprintf(
`SELECT %s FROM %s`,
`SELECT %s FROM "%s"`,
strings.Join(db.BuildColumns(columns), ", "),
utils.TableName(table),
)
@ -160,7 +172,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) {
}
return fmt.Sprintf(
`UPDATE %s SET %s WHERE id = :id`,
`UPDATE "%s" SET %s WHERE id = :id`,
utils.TableName(update),
strings.Join(set, ", "),
), len(columns) + 1 // +1 because of WHERE id = :id
@ -169,6 +181,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) {
// BuildUpsertStmt returns an upsert statement for the given struct.
func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) {
insertColumns := db.BuildColumns(subject)
table := utils.TableName(subject)
var updateColumns []string
if upserter, ok := subject.(contracts.Upserter); ok {
@ -177,17 +190,28 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in
updateColumns = insertColumns
}
var clause, setFormat string
switch db.DriverName() {
case driver.MySQL:
clause = "ON DUPLICATE KEY UPDATE"
setFormat = "%[1]s = VALUES(%[1]s)"
case driver.PostgreSQL:
clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO UPDATE SET", table)
setFormat = "%[1]s = EXCLUDED.%[1]s"
}
set := make([]string, 0, len(updateColumns))
for _, col := range updateColumns {
set = append(set, fmt.Sprintf("%s = VALUES(%s)", col, col))
set = append(set, fmt.Sprintf(setFormat, col))
}
return fmt.Sprintf(
`INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`,
utils.TableName(subject),
`INSERT INTO "%s" (%s) VALUES (%s) %s %s`,
table,
strings.Join(insertColumns, ","),
fmt.Sprintf(":%s", strings.Join(insertColumns, ",:")),
clause,
strings.Join(set, ","),
), len(insertColumns)
}
@ -281,13 +305,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
// Entities for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory,
) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count)
bulk := com.BulkEntities(ctx, arg, count, splitPolicyFactory)
g.Go(func() error {
for {
@ -355,7 +379,7 @@ func (db *DB) NamedBulkExecTx(
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count)
bulk := com.BulkEntities(ctx, arg, count, com.NeverSplit)
g.Go(func() error {
for {
@ -478,7 +502,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildInsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit)
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
@ -494,7 +518,9 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildUpsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded, com.SplitOnDupId,
)
}
// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx.
@ -559,7 +585,7 @@ func (db *DB) log(ctx context.Context, query string, counter *com.Counter) perio
// IsRetryable checks whether the given error is retryable.
func IsRetryable(err error) bool {
if errors.Is(err, driver.ErrBadConn) {
if errors.Is(err, sqlDriver.ErrBadConn) {
return true
}
@ -576,6 +602,35 @@ func IsRetryable(err error) bool {
// 1213: Deadlock found when trying to get lock
// 2006: MySQL server has gone away
return true
default:
return false
}
}
var pe *pq.Error
if errors.As(err, &pe) {
switch pe.Code {
case "08000", // connection_exception
"08006", // connection_failure
"08001", // sqlclient_unable_to_establish_sqlconnection
"08004", // sqlserver_rejected_establishment_of_sqlconnection
"40001", // serialization_failure
"40P01", // deadlock_detected
"54000", // program_limit_exceeded
"55006", // object_in_use
"55P03", // lock_not_available
"57P01", // admin_shutdown
"57P02", // crash_shutdown
"57P03", // cannot_connect_now
"58000", // system_error
"58030", // io_error
"XX000": // internal_error
return true
default:
if strings.HasPrefix(string(pe.Code), "53") {
// Class 53 - Insufficient Resources
return true
}
}
}

View file

@ -236,8 +236,8 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return errors.Wrap(errBegin, "can't start transaction")
}
query := `SELECT id, heartbeat FROM icingadb_instance` +
` WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`
query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance " +
"WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?")
instance := &v1.IcingadbInstance{}
@ -339,7 +339,7 @@ func (h *HA) insertEnvironment() error {
func (h *HA) removeInstance(ctx context.Context) {
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
// Intentionally not using h.ctx here as it's already cancelled.
query := "DELETE FROM icingadb_instance WHERE id = ?"
query := h.db.Rebind("DELETE FROM icingadb_instance WHERE id = ?")
_, err := h.db.ExecContext(ctx, query, h.instanceId)
if err != nil {
h.logger.Warnw("Could not remove instance from database", zap.Error(err), zap.String("query", query))
@ -351,8 +351,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
case <-h.ctx.Done():
return
case <-time.After(timeout):
query := "DELETE FROM icingadb_instance " +
"WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?"
query := h.db.Rebind("DELETE FROM icingadb_instance " +
"WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?")
heartbeat := types.UnixMilli(time.Now().Add(-timeout))
result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId,
s.EndpointId, heartbeat)

View file

@ -109,7 +109,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted,
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId,
)
})
g.Go(func() error {
@ -213,7 +213,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars,
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId,
)
})
g.Go(func() error {
@ -248,7 +248,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars,
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, com.SplitOnDupId,
)
})
g.Go(func() error {

View file

@ -7,6 +7,7 @@ import (
"encoding"
"encoding/json"
"github.com/icinga/icingadb/internal"
"strings"
)
// String adds JSON support to sql.NullString.
@ -52,6 +53,17 @@ func (s *String) UnmarshalJSON(data []byte) error {
return nil
}
// Value implements the driver.Valuer interface.
// Supports SQL NULL.
func (s String) Value() (driver.Value, error) {
if !s.Valid {
return nil, nil
}
// PostgreSQL does not allow null bytes in varchar, char and text fields.
return strings.ReplaceAll(s.String, "\x00", ""), nil
}
// Assert interface compliance.
var (
_ json.Marshaler = String{}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"github.com/go-sql-driver/mysql"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/lib/pq"
"github.com/pkg/errors"
"golang.org/x/exp/utf8string"
"math"
@ -125,6 +126,16 @@ func IsDeadlock(err error) bool {
switch e.Number {
case 1205, 1213:
return true
default:
return false
}
}
var pe *pq.Error
if errors.As(err, &pe) {
switch pe.Code {
case "40001", "40P01":
return true
}
}

1910
schema/pgsql/schema.sql Normal file

File diff suppressed because it is too large Load diff