NamedBulkExec(): allow custom BulkChunkSplitPolicy

By the way avoid duplicate rows in the same upsert chunk to avoid Postgres
error 21000 (ON CONFLICT DO UPDATE command cannot affect row a second time).

refs #136
This commit is contained in:
Alexander A. Klimov 2021-11-19 16:55:29 +01:00
parent db2c3af769
commit 7f4b895ea9
2 changed files with 9 additions and 7 deletions

View file

@ -304,13 +304,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
// Entities for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, splitPolicy com.BulkChunkSplitPolicy,
) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count, com.NeverSplit{})
bulk := com.BulkEntities(ctx, arg, count, splitPolicy)
g.Go(func() error {
for {
@ -501,7 +501,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildInsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit{})
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
@ -517,7 +517,9 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildUpsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded, com.NewSplitOnDupId(),
)
}
// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx.

View file

@ -109,7 +109,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted,
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.NewSplitOnDupId(),
)
})
g.Go(func() error {
@ -213,7 +213,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars,
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.NewSplitOnDupId(),
)
})
g.Go(func() error {
@ -248,7 +248,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars,
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, com.NewSplitOnDupId(),
)
})
g.Go(func() error {