mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Reduce the size of bulk statements and make the size configurable
Reduce the size of the bulk create, update, and delete statements to also reduce query execution time so that the database server can better execute statements in parallel.
This commit is contained in:
parent
ab4caa32a2
commit
bbba443529
2 changed files with 28 additions and 6 deletions
|
|
@ -35,6 +35,17 @@ type DB struct {
|
|||
type Options struct {
|
||||
MaxConnections int `yaml:"max_connections" default:"16"`
|
||||
MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`
|
||||
|
||||
// MaxPlaceholdersPerStatement defines the maximum number of placeholders in an
|
||||
// INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders,
|
||||
// but this increases the execution time of queries and thus reduces the number of queries
|
||||
// that can be executed in parallel in a given time.
|
||||
// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
|
||||
MaxPlaceholdersPerStatement int `yaml:"MaxPlaceholdersPerStatement" default:"8192"`
|
||||
|
||||
// MaxRowsPerTransaction defines the maximum number of rows per transaction.
|
||||
// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
|
||||
MaxRowsPerTransaction int `yaml:"MaxRowsPerTransaction" default:"8192"`
|
||||
}
|
||||
|
||||
// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
|
||||
|
|
@ -323,6 +334,17 @@ func (db *DB) NamedBulkExecTx(
|
|||
return g.Wait()
|
||||
}
|
||||
|
||||
// BatchSizeByPlaceholders returns how often the specified number of placeholders fits
|
||||
// into Options.MaxPlaceholdersPerStatement, but at least 1.
|
||||
func (db *DB) BatchSizeByPlaceholders(n int) int {
|
||||
s := db.options.MaxPlaceholdersPerStatement / n
|
||||
if s > 0 {
|
||||
return s
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) {
|
||||
var cnt com.Counter
|
||||
entities := make(chan contracts.Entity, 1)
|
||||
|
|
@ -373,7 +395,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
|
|||
sem := db.getSemaphoreForTable(utils.TableName(first))
|
||||
stmt, placeholders := db.BuildInsertStmt(first)
|
||||
|
||||
return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, forward, nil)
|
||||
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil)
|
||||
}
|
||||
|
||||
func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
|
||||
|
|
@ -385,7 +407,7 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
|
|||
sem := db.getSemaphoreForTable(utils.TableName(first))
|
||||
stmt, placeholders := db.BuildUpsertStmt(first)
|
||||
|
||||
return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, forward, succeeded)
|
||||
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded)
|
||||
}
|
||||
|
||||
func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
|
||||
|
|
@ -394,14 +416,14 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
|
|||
return errors.Wrap(err, "can't copy first entity")
|
||||
}
|
||||
sem := db.getSemaphoreForTable(utils.TableName(first))
|
||||
stmt, placeholders := db.BuildUpdateStmt(first)
|
||||
stmt, _ := db.BuildUpdateStmt(first)
|
||||
|
||||
return db.NamedBulkExecTx(ctx, stmt, 1<<15/placeholders, sem, forward)
|
||||
return db.NamedBulkExecTx(ctx, stmt, db.options.MaxRowsPerTransaction, sem, forward)
|
||||
}
|
||||
|
||||
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
|
||||
sem := db.getSemaphoreForTable(utils.TableName(entityType))
|
||||
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), 1<<15, sem, ids)
|
||||
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.options.MaxPlaceholdersPerStatement, sem, ids)
|
||||
}
|
||||
|
||||
func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error {
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
stmt, placeholders := r.db.BuildUpsertStmt(v)
|
||||
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
|
||||
sem := semaphore.NewWeighted(1)
|
||||
return r.db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, upsertEntities, nil)
|
||||
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil)
|
||||
})
|
||||
g.Go(func() error {
|
||||
return r.db.DeleteStreamed(ctx, v, deleteIds)
|
||||
|
|
|
|||
Loading…
Reference in a new issue