From 5cba0f9e22ee3933f4d02a5a52f48461a67cd666 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 1 Jul 2021 11:13:17 +0200 Subject: [PATCH] Return number of placeholders --- pkg/icingadb/db.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 46a435f4..852113ad 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -67,7 +67,7 @@ func (db *DB) BuildDeleteStmt(from interface{}) string { ) } -func (db *DB) BuildInsertStmt(into interface{}) string { +func (db *DB) BuildInsertStmt(into interface{}) (string, int) { columns := db.BuildColumns(into) return fmt.Sprintf( @@ -75,7 +75,7 @@ func (db *DB) BuildInsertStmt(into interface{}) string { utils.TableName(into), strings.Join(columns, ", "), fmt.Sprintf(":%s", strings.Join(columns, ", :")), - ) + ), len(columns) } func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string { @@ -86,7 +86,7 @@ func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string { ) } -func (db *DB) BuildUpdateStmt(update interface{}) string { +func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { columns := db.BuildColumns(update) set := make([]string, 0, len(columns)) @@ -98,7 +98,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) string { `UPDATE %s SET %s WHERE id = :id`, utils.TableName(update), strings.Join(set, ", "), - ) + ), len(columns) } func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { @@ -371,7 +371,9 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti } sem := db.getSemaphoreForTable(utils.TableName(first)) - return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), sem, forward, nil) + stmt, placeholders := db.BuildInsertStmt(first) + + return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, sem, forward, nil) } func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { @@ -394,7 +396,9 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti return errors.Wrap(err, "can't copy first entity") } sem := db.getSemaphoreForTable(utils.TableName(first)) - return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, sem, forward) + stmt, placeholders := db.BuildUpdateStmt(first) + + return db.NamedBulkExecTx(ctx, stmt, 1<<15/placeholders, sem, forward) } func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {