From f82de0c1a949b5008eb8fad1468a22a72a817d93 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 30 Apr 2021 12:29:38 +0200 Subject: [PATCH] Make Redis/MySQL concurrency and batch sizes configurable --- pkg/config/database.go | 19 +++++--- pkg/config/redis.go | 36 ++++++++++++-- pkg/icingadb/db.go | 85 +++++++++++++++++++++------------ pkg/icingadb/runtime_updates.go | 3 +- pkg/icingadb/sync.go | 10 ---- pkg/icingaredis/client.go | 27 +++++++---- 6 files changed, 117 insertions(+), 63 deletions(-) diff --git a/pkg/config/database.go b/pkg/config/database.go index ad760331..e9bfebb7 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -1,6 +1,7 @@ package config import ( + "errors" "fmt" "github.com/creasty/defaults" "github.com/icinga/icingadb/pkg/driver" @@ -16,12 +17,12 @@ var registerDriverOnce sync.Once // Database defines database client configuration. type Database struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - Database string `yaml:"database"` - User string `yaml:"user"` - Password string `yaml:"password"` - MaxConnections int `yaml:"max_connections" default:"16"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + User string `yaml:"user"` + Password string `yaml:"password"` + icingadb.Options `yaml:",inline"` } // Open prepares the DSN string and driver configuration, @@ -47,7 +48,7 @@ func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) { return utils.Key(s, '_') }) - return icingadb.NewDb(db, logger), nil + return icingadb.NewDb(db, logger, &d.Options), nil } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -61,5 +62,9 @@ func (d *Database) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } + if d.MaxConnectionsPerTable < 1 { + return errors.New("max_connections_per_table must be at least 1") + } + return nil } diff --git a/pkg/config/redis.go b/pkg/config/redis.go index d6a3b2a6..4a01be44 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -1,16 +1,18 @@ package config import ( + "errors" + "github.com/creasty/defaults" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/icingaredis" "go.uber.org/zap" - "time" ) // Redis defines Redis client configuration. type Redis struct { - Address string `yaml:"address"` - Password string `yaml:"password"` + Address string `yaml:"address"` + Password string `yaml:"password"` + icingaredis.Options `yaml:",inline"` } // NewClient prepares Redis client configuration, @@ -20,8 +22,32 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error Addr: r.Address, Password: r.Password, DB: 0, // Use default DB, - ReadTimeout: 30 * time.Second, + ReadTimeout: r.Timeout, }) - return icingaredis.NewClient(c, logger), nil + return icingaredis.NewClient(c, logger, &r.Options), nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (d *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(d); err != nil { + return err + } + // Prevent recursion. + type self Redis + if err := unmarshal((*self)(d)); err != nil { + return err + } + + if d.MaxHMGetConnections < 1 { + return errors.New("max_hmget_connections must be at least 1") + } + if d.HMGetCount < 1 { + return errors.New("hmget_count must be at least 1") + } + if d.HScanCount < 1 { + return errors.New("hscan_count must be at least 1") + } + + return nil } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 994653e0..31127258 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sync/semaphore" "reflect" "strings" + "sync" "time" ) @@ -24,15 +25,28 @@ import ( type DB struct { *sqlx.DB - logger *zap.SugaredLogger + logger *zap.SugaredLogger + options *Options + tableSemaphores map[string]*semaphore.Weighted + tableSemaphoresMu sync.Mutex +} + +type Options struct { + MaxConnections int `yaml:"max_connections" default:"16"` + MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"` } // NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. -func NewDb(db *sqlx.DB, logger *zap.SugaredLogger) *DB { - return &DB{DB: db, logger: logger} +func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB { + return &DB{ + DB: db, + logger: logger, + options: options, + tableSemaphores: make(map[string]*semaphore.Weighted), + } } -func (db DB) BuildColumns(subject interface{}) []string { +func (db *DB) BuildColumns(subject interface{}) []string { fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names columns := make([]string, 0, len(fields)) for _, f := range fields { @@ -45,14 +59,14 @@ func (db DB) BuildColumns(subject interface{}) []string { return columns } -func (db DB) BuildDeleteStmt(from interface{}) string { +func (db *DB) BuildDeleteStmt(from interface{}) string { return fmt.Sprintf( `DELETE FROM %s WHERE id IN (?)`, utils.TableName(from), ) } -func (db DB) BuildInsertStmt(into interface{}) string { +func (db *DB) BuildInsertStmt(into interface{}) string { columns := db.BuildColumns(into) return fmt.Sprintf( @@ -63,7 +77,7 @@ func (db DB) BuildInsertStmt(into interface{}) string { ) } -func (db DB) BuildSelectStmt(from interface{}, into interface{}) string { +func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string { return fmt.Sprintf( `SELECT %s FROM %s`, strings.Join(db.BuildColumns(into), ", "), @@ -71,7 +85,7 @@ func (db DB) BuildSelectStmt(from interface{}, into interface{}) string { ) } -func (db DB) BuildUpdateStmt(update interface{}) string { +func (db *DB) BuildUpdateStmt(update interface{}) string { columns := db.BuildColumns(update) set := make([]string, 0, len(columns)) @@ -86,7 +100,7 @@ func (db DB) BuildUpdateStmt(update interface{}) string { ) } -func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { +func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { insertColumns := db.BuildColumns(subject) var updateColumns []string @@ -111,7 +125,7 @@ func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int ), len(insertColumns) + len(updateColumns) } -func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent int, arg <-chan interface{}) error { +func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) // Use context from group. @@ -123,8 +137,6 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i }) g.Go(func() error { - sem := semaphore.NewWeighted(int64(concurrent)) - g, ctx := errgroup.WithContext(ctx) for b := range bulk { @@ -166,8 +178,8 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i return g.Wait() } -func (db DB) NamedBulkExec( - ctx context.Context, query string, count int, concurrent int, +func (db *DB) NamedBulkExec( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { var cnt com.Counter @@ -180,7 +192,6 @@ func (db DB) NamedBulkExec( }) g.Go(func() error { - sem := semaphore.NewWeighted(int64(concurrent)) // stmt, err := db.PrepareNamedContext(ctx, query) // if err != nil { // return err @@ -238,8 +249,8 @@ func (db DB) NamedBulkExec( return g.Wait() } -func (db DB) NamedBulkExecTx( - ctx context.Context, query string, count int, concurrent int, arg <-chan contracts.Entity, +func (db *DB) NamedBulkExecTx( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -251,8 +262,6 @@ func (db DB) NamedBulkExecTx( }) g.Go(func() error { - sem := semaphore.NewWeighted(int64(concurrent)) - for { select { case b, ok := <-bulk: @@ -308,7 +317,7 @@ func (db DB) NamedBulkExecTx( return g.Wait() } -func (db DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) { +func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) { var cnt com.Counter entities := make(chan contracts.Entity, 1) g, ctx := errgroup.WithContext(ctx) @@ -349,16 +358,17 @@ func (db DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFu return entities, com.WaitAsync(g) } -func (db DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { +func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return err } - return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), 1<<3, forward, nil) + sem := db.getSemaphoreForTable(utils.TableName(first)) + return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), sem, forward, nil) } -func (db DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { +func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return err @@ -368,23 +378,25 @@ func (db DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entit //stmt, placeholders := db.BuildUpsertStmt(first) //return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded) stmt, _ := db.BuildUpsertStmt(first) - return db.NamedBulkExec(ctx, stmt, 1, 1<<3, forward, succeeded) + sem := db.getSemaphoreForTable(utils.TableName(first)) + return db.NamedBulkExec(ctx, stmt, 1, sem, forward, succeeded) } -func (db DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { +func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return err } - - return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, 1<<3, forward) + sem := db.getSemaphoreForTable(utils.TableName(first)) + return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, sem, forward) } -func (db DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), 1<<15, 1<<3, ids) +func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { + sem := db.getSemaphoreForTable(utils.TableName(entityType)) + return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), 1<<15, sem, ids) } -func (db DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { +func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { idsCh := make(chan interface{}, len(ids)) for _, id := range ids { idsCh <- id @@ -415,3 +427,16 @@ func IsRetryable(err error) bool { return false } + +func (db *DB) getSemaphoreForTable(table string) *semaphore.Weighted { + db.tableSemaphoresMu.Lock() + defer db.tableSemaphoresMu.Unlock() + + if sem, ok := db.tableSemaphores[table]; ok { + return sem + } else { + sem = semaphore.NewWeighted(int64(db.options.MaxConnectionsPerTable)) + db.tableSemaphores[table] = sem + return sem + } +} diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 6521b38e..16e23530 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -59,7 +59,8 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti g.Go(func() error { stmt, _ := r.db.BuildUpsertStmt(v) // TODO(nh) Currently not possible to increase the count here: https://github.com/jmoiron/sqlx/issues/694 - return r.db.NamedBulkExec(ctx, stmt, 1, 1, upsertEntities, nil) + sem := r.db.getSemaphoreForTable(utils.TableName(v)) + return r.db.NamedBulkExec(ctx, stmt, 1, sem, upsertEntities, nil) }) g.Go(func() error { return r.db.DeleteStreamed(ctx, v, deleteIds) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 16e703a0..3b0e0eed 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -14,12 +14,6 @@ import ( "time" ) -var ( - // Redis concurrency settings. - count = 1 << 12 - concurrent = 1 << 3 -) - // Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities. type Sync struct { db *DB @@ -108,8 +102,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { pairs, errs := s.redis.HMYield( ctx, fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), - count, - concurrent, delta.Create.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) @@ -135,8 +127,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { pairs, errs := s.redis.HMYield( ctx, fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), - count, - concurrent, delta.Update.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 8a024cae..0b9dda7b 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -19,12 +19,20 @@ import ( type Client struct { *redis.Client - logger *zap.SugaredLogger + logger *zap.SugaredLogger + options *Options +} + +type Options struct { + Timeout time.Duration `yaml:"timeout" default:"30s"` + MaxHMGetConnections int `yaml:"max_hmget_connections" default:"4096"` + HMGetCount int `yaml:"hmget_count" default:"4096"` + HScanCount int `yaml:"hscan_count" default:"4096"` } // NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client. -func NewClient(client *redis.Client, logger *zap.SugaredLogger) *Client { - return &Client{Client: client, logger: logger} +func NewClient(client *redis.Client, logger *zap.SugaredLogger, options *Options) *Client { + return &Client{Client: client, logger: logger, options: options} } // HPair defines Redis hashes field-value pairs. @@ -34,7 +42,7 @@ type HPair struct { } // HYield yields HPair field-value pairs for all fields in the hash stored at key. -func (c *Client) HYield(ctx context.Context, key string, count int) (<-chan HPair, <-chan error) { +func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) { pairs := make(chan HPair) g, ctx := errgroup.WithContext(ctx) @@ -55,8 +63,7 @@ func (c *Client) HYield(ctx context.Context, key string, count int) (<-chan HPai g, ctx := errgroup.WithContext(ctx) for { - page, cursor, err = c.HScan( - ctx, key, cursor, "", int64(count)).Result() + page, cursor, err = c.HScan(ctx, key, cursor, "", int64(c.options.HScanCount)).Result() if err != nil { return err } @@ -91,16 +98,16 @@ func (c *Client) HYield(ctx context.Context, key string, count int) (<-chan HPai } // HMYield yields HPair field-value pairs for the specified fields in the hash stored at key. -func (c *Client) HMYield(ctx context.Context, key string, count int, concurrent int, fields ...string) (<-chan HPair, <-chan error) { +func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error) { pairs := make(chan HPair) g, ctx := errgroup.WithContext(ctx) // Use context from group. - batches := utils.BatchSliceOfStrings(ctx, fields, count) + batches := utils.BatchSliceOfStrings(ctx, fields, c.options.HMGetCount) g.Go(func() error { defer close(pairs) - sem := semaphore.NewWeighted(int64(concurrent)) + sem := semaphore.NewWeighted(int64(c.options.MaxHMGetConnections)) g, ctx := errgroup.WithContext(ctx) @@ -174,7 +181,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch key = "icinga:" + key } - pairs, errs := c.HYield(ctx, key, 1<<12) + pairs, errs := c.HYield(ctx, key) g, ctx := errgroup.WithContext(ctx) // Let errors from HYield cancel the group. com.ErrgroupReceive(g, errs)