Merge pull request #46 from lippserd/feature/configurable-concurrency

Add configuration options for MySQL/Redis concurrency and bulk sizes
This commit is contained in:
Eric Lippmann 2021-05-12 14:12:24 +02:00 committed by GitHub
commit 2bf0aac846
6 changed files with 117 additions and 63 deletions

View file

@ -1,6 +1,7 @@
package config
import (
"errors"
"fmt"
"github.com/creasty/defaults"
"github.com/icinga/icingadb/pkg/driver"
@ -16,12 +17,12 @@ var registerDriverOnce sync.Once
// Database defines database client configuration.
type Database struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
User string `yaml:"user"`
Password string `yaml:"password"`
MaxConnections int `yaml:"max_connections" default:"16"`
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
User string `yaml:"user"`
Password string `yaml:"password"`
icingadb.Options `yaml:",inline"`
}
// Open prepares the DSN string and driver configuration,
@ -47,7 +48,7 @@ func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) {
return utils.Key(s, '_')
})
return icingadb.NewDb(db, logger), nil
return icingadb.NewDb(db, logger, &d.Options), nil
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -61,5 +62,9 @@ func (d *Database) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err
}
if d.MaxConnectionsPerTable < 1 {
return errors.New("max_connections_per_table must be at least 1")
}
return nil
}

View file

@ -1,16 +1,18 @@
package config
import (
"errors"
"github.com/creasty/defaults"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/icingaredis"
"go.uber.org/zap"
"time"
)
// Redis defines Redis client configuration.
type Redis struct {
Address string `yaml:"address"`
Password string `yaml:"password"`
Address string `yaml:"address"`
Password string `yaml:"password"`
icingaredis.Options `yaml:",inline"`
}
// NewClient prepares Redis client configuration,
@ -20,8 +22,32 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error
Addr: r.Address,
Password: r.Password,
DB: 0, // Use default DB,
ReadTimeout: 30 * time.Second,
ReadTimeout: r.Timeout,
})
return icingaredis.NewClient(c, logger), nil
return icingaredis.NewClient(c, logger, &r.Options), nil
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (d *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := defaults.Set(d); err != nil {
return err
}
// Prevent recursion.
type self Redis
if err := unmarshal((*self)(d)); err != nil {
return err
}
if d.MaxHMGetConnections < 1 {
return errors.New("max_hmget_connections must be at least 1")
}
if d.HMGetCount < 1 {
return errors.New("hmget_count must be at least 1")
}
if d.HScanCount < 1 {
return errors.New("hscan_count must be at least 1")
}
return nil
}

View file

@ -16,6 +16,7 @@ import (
"golang.org/x/sync/semaphore"
"reflect"
"strings"
"sync"
"time"
)
@ -24,15 +25,28 @@ import (
type DB struct {
*sqlx.DB
logger *zap.SugaredLogger
logger *zap.SugaredLogger
options *Options
tableSemaphores map[string]*semaphore.Weighted
tableSemaphoresMu sync.Mutex
}
type Options struct {
MaxConnections int `yaml:"max_connections" default:"16"`
MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`
}
// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
func NewDb(db *sqlx.DB, logger *zap.SugaredLogger) *DB {
return &DB{DB: db, logger: logger}
func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB {
return &DB{
DB: db,
logger: logger,
options: options,
tableSemaphores: make(map[string]*semaphore.Weighted),
}
}
func (db DB) BuildColumns(subject interface{}) []string {
func (db *DB) BuildColumns(subject interface{}) []string {
fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names
columns := make([]string, 0, len(fields))
for _, f := range fields {
@ -45,14 +59,14 @@ func (db DB) BuildColumns(subject interface{}) []string {
return columns
}
func (db DB) BuildDeleteStmt(from interface{}) string {
func (db *DB) BuildDeleteStmt(from interface{}) string {
return fmt.Sprintf(
`DELETE FROM %s WHERE id IN (?)`,
utils.TableName(from),
)
}
func (db DB) BuildInsertStmt(into interface{}) string {
func (db *DB) BuildInsertStmt(into interface{}) string {
columns := db.BuildColumns(into)
return fmt.Sprintf(
@ -63,7 +77,7 @@ func (db DB) BuildInsertStmt(into interface{}) string {
)
}
func (db DB) BuildSelectStmt(from interface{}, into interface{}) string {
func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string {
return fmt.Sprintf(
`SELECT %s FROM %s`,
strings.Join(db.BuildColumns(into), ", "),
@ -71,7 +85,7 @@ func (db DB) BuildSelectStmt(from interface{}, into interface{}) string {
)
}
func (db DB) BuildUpdateStmt(update interface{}) string {
func (db *DB) BuildUpdateStmt(update interface{}) string {
columns := db.BuildColumns(update)
set := make([]string, 0, len(columns))
@ -86,7 +100,7 @@ func (db DB) BuildUpdateStmt(update interface{}) string {
)
}
func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) {
func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) {
insertColumns := db.BuildColumns(subject)
var updateColumns []string
@ -111,7 +125,7 @@ func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int
), len(insertColumns) + len(updateColumns)
}
func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent int, arg <-chan interface{}) error {
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error {
var cnt com.Counter
g, ctx := errgroup.WithContext(ctx)
// Use context from group.
@ -123,8 +137,6 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i
})
g.Go(func() error {
sem := semaphore.NewWeighted(int64(concurrent))
g, ctx := errgroup.WithContext(ctx)
for b := range bulk {
@ -166,8 +178,8 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i
return g.Wait()
}
func (db DB) NamedBulkExec(
ctx context.Context, query string, count int, concurrent int,
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
) error {
var cnt com.Counter
@ -180,7 +192,6 @@ func (db DB) NamedBulkExec(
})
g.Go(func() error {
sem := semaphore.NewWeighted(int64(concurrent))
// stmt, err := db.PrepareNamedContext(ctx, query)
// if err != nil {
// return err
@ -238,8 +249,8 @@ func (db DB) NamedBulkExec(
return g.Wait()
}
func (db DB) NamedBulkExecTx(
ctx context.Context, query string, count int, concurrent int, arg <-chan contracts.Entity,
func (db *DB) NamedBulkExecTx(
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
) error {
var cnt com.Counter
g, ctx := errgroup.WithContext(ctx)
@ -251,8 +262,6 @@ func (db DB) NamedBulkExecTx(
})
g.Go(func() error {
sem := semaphore.NewWeighted(int64(concurrent))
for {
select {
case b, ok := <-bulk:
@ -308,7 +317,7 @@ func (db DB) NamedBulkExecTx(
return g.Wait()
}
func (db DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) {
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) {
var cnt com.Counter
entities := make(chan contracts.Entity, 1)
g, ctx := errgroup.WithContext(ctx)
@ -349,16 +358,17 @@ func (db DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFu
return entities, com.WaitAsync(g)
}
func (db DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return err
}
return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), 1<<3, forward, nil)
sem := db.getSemaphoreForTable(utils.TableName(first))
return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), sem, forward, nil)
}
func (db DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return err
@ -368,23 +378,25 @@ func (db DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entit
//stmt, placeholders := db.BuildUpsertStmt(first)
//return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded)
stmt, _ := db.BuildUpsertStmt(first)
return db.NamedBulkExec(ctx, stmt, 1, 1<<3, forward, succeeded)
sem := db.getSemaphoreForTable(utils.TableName(first))
return db.NamedBulkExec(ctx, stmt, 1, sem, forward, succeeded)
}
func (db DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return err
}
return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, 1<<3, forward)
sem := db.getSemaphoreForTable(utils.TableName(first))
return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, sem, forward)
}
func (db DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), 1<<15, 1<<3, ids)
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
sem := db.getSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), 1<<15, sem, ids)
}
func (db DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error {
func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error {
idsCh := make(chan interface{}, len(ids))
for _, id := range ids {
idsCh <- id
@ -415,3 +427,16 @@ func IsRetryable(err error) bool {
return false
}
func (db *DB) getSemaphoreForTable(table string) *semaphore.Weighted {
db.tableSemaphoresMu.Lock()
defer db.tableSemaphoresMu.Unlock()
if sem, ok := db.tableSemaphores[table]; ok {
return sem
} else {
sem = semaphore.NewWeighted(int64(db.options.MaxConnectionsPerTable))
db.tableSemaphores[table] = sem
return sem
}
}

View file

@ -59,7 +59,8 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
g.Go(func() error {
stmt, _ := r.db.BuildUpsertStmt(v)
// TODO(nh) Currently not possible to increase the count here: https://github.com/jmoiron/sqlx/issues/694
return r.db.NamedBulkExec(ctx, stmt, 1, 1, upsertEntities, nil)
sem := r.db.getSemaphoreForTable(utils.TableName(v))
return r.db.NamedBulkExec(ctx, stmt, 1, sem, upsertEntities, nil)
})
g.Go(func() error {
return r.db.DeleteStreamed(ctx, v, deleteIds)

View file

@ -14,12 +14,6 @@ import (
"time"
)
var (
// Redis concurrency settings.
count = 1 << 12
concurrent = 1 << 3
)
// Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities.
type Sync struct {
db *DB
@ -108,8 +102,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')),
count,
concurrent,
delta.Create.Keys()...)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, errs)
@ -135,8 +127,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')),
count,
concurrent,
delta.Update.Keys()...)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, errs)

View file

@ -19,12 +19,20 @@ import (
type Client struct {
*redis.Client
logger *zap.SugaredLogger
logger *zap.SugaredLogger
options *Options
}
type Options struct {
Timeout time.Duration `yaml:"timeout" default:"30s"`
MaxHMGetConnections int `yaml:"max_hmget_connections" default:"4096"`
HMGetCount int `yaml:"hmget_count" default:"4096"`
HScanCount int `yaml:"hscan_count" default:"4096"`
}
// NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client.
func NewClient(client *redis.Client, logger *zap.SugaredLogger) *Client {
return &Client{Client: client, logger: logger}
func NewClient(client *redis.Client, logger *zap.SugaredLogger, options *Options) *Client {
return &Client{Client: client, logger: logger, options: options}
}
// HPair defines Redis hashes field-value pairs.
@ -34,7 +42,7 @@ type HPair struct {
}
// HYield yields HPair field-value pairs for all fields in the hash stored at key.
func (c *Client) HYield(ctx context.Context, key string, count int) (<-chan HPair, <-chan error) {
func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) {
pairs := make(chan HPair)
g, ctx := errgroup.WithContext(ctx)
@ -55,8 +63,7 @@ func (c *Client) HYield(ctx context.Context, key string, count int) (<-chan HPai
g, ctx := errgroup.WithContext(ctx)
for {
page, cursor, err = c.HScan(
ctx, key, cursor, "", int64(count)).Result()
page, cursor, err = c.HScan(ctx, key, cursor, "", int64(c.options.HScanCount)).Result()
if err != nil {
return err
}
@ -91,16 +98,16 @@ func (c *Client) HYield(ctx context.Context, key string, count int) (<-chan HPai
}
// HMYield yields HPair field-value pairs for the specified fields in the hash stored at key.
func (c *Client) HMYield(ctx context.Context, key string, count int, concurrent int, fields ...string) (<-chan HPair, <-chan error) {
func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error) {
pairs := make(chan HPair)
g, ctx := errgroup.WithContext(ctx)
// Use context from group.
batches := utils.BatchSliceOfStrings(ctx, fields, count)
batches := utils.BatchSliceOfStrings(ctx, fields, c.options.HMGetCount)
g.Go(func() error {
defer close(pairs)
sem := semaphore.NewWeighted(int64(concurrent))
sem := semaphore.NewWeighted(int64(c.options.MaxHMGetConnections))
g, ctx := errgroup.WithContext(ctx)
@ -174,7 +181,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch
key = "icinga:" + key
}
pairs, errs := c.HYield(ctx, key, 1<<12)
pairs, errs := c.HYield(ctx, key)
g, ctx := errgroup.WithContext(ctx)
// Let errors from HYield cancel the group.
com.ErrgroupReceive(g, errs)