From bbba443529a23679d331fae414c91bdf74cc83a2 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Fri, 30 Jul 2021 17:12:29 +0200 Subject: [PATCH] Reduce the size of bulk statements and make the size configurable Reduce the size of the bulk create, update, and delete statements to also reduce query execution time so that the database server can better execute statements in parallel. --- pkg/icingadb/db.go | 32 +++++++++++++++++++++++++++----- pkg/icingadb/runtime_updates.go | 2 +- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 130f7098..e36151f6 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -35,6 +35,17 @@ type DB struct { type Options struct { MaxConnections int `yaml:"max_connections" default:"16"` MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"` + + // MaxPlaceholdersPerStatement defines the maximum number of placeholders in an + // INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders, + // but this increases the execution time of queries and thus reduces the number of queries + // that can be executed in parallel in a given time. + // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. + MaxPlaceholdersPerStatement int `yaml:"MaxPlaceholdersPerStatement" default:"8192"` + + // MaxRowsPerTransaction defines the maximum number of rows per transaction. + // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. + MaxRowsPerTransaction int `yaml:"MaxRowsPerTransaction" default:"8192"` } // NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. @@ -323,6 +334,17 @@ func (db *DB) NamedBulkExecTx( return g.Wait() } +// BatchSizeByPlaceholders returns how often the specified number of placeholders fits +// into Options.MaxPlaceholdersPerStatement, but at least 1. +func (db *DB) BatchSizeByPlaceholders(n int) int { + s := db.options.MaxPlaceholdersPerStatement / n + if s > 0 { + return s + } + + return 1 +} + func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) { var cnt com.Counter entities := make(chan contracts.Entity, 1) @@ -373,7 +395,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti sem := db.getSemaphoreForTable(utils.TableName(first)) stmt, placeholders := db.BuildInsertStmt(first) - return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, forward, nil) + return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil) } func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { @@ -385,7 +407,7 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti sem := db.getSemaphoreForTable(utils.TableName(first)) stmt, placeholders := db.BuildUpsertStmt(first) - return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, forward, succeeded) + return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded) } func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { @@ -394,14 +416,14 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti return errors.Wrap(err, "can't copy first entity") } sem := db.getSemaphoreForTable(utils.TableName(first)) - stmt, placeholders := db.BuildUpdateStmt(first) + stmt, _ := db.BuildUpdateStmt(first) - return db.NamedBulkExecTx(ctx, stmt, 1<<15/placeholders, sem, forward) + return db.NamedBulkExecTx(ctx, stmt, db.options.MaxRowsPerTransaction, sem, forward) } func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { sem := db.getSemaphoreForTable(utils.TableName(entityType)) - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), 1<<15, sem, ids) + return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.options.MaxPlaceholdersPerStatement, sem, ids) } func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index ea467296..cfde005f 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -60,7 +60,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti stmt, placeholders := r.db.BuildUpsertStmt(v) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, upsertEntities, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil) }) g.Go(func() error { return r.db.DeleteStreamed(ctx, v, deleteIds)