diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 8dfb2ab4..1d71ff91 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -304,13 +304,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph // Entities for which the query ran successfully will be streamed on the succeeded channel. func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, - arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, + arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, splitPolicy com.BulkChunkSplitPolicy, ) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() g, ctx := errgroup.WithContext(ctx) - bulk := com.BulkEntities(ctx, arg, count, com.NeverSplit{}) + bulk := com.BulkEntities(ctx, arg, count, splitPolicy) g.Go(func() error { for { @@ -501,7 +501,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti sem := db.GetSemaphoreForTable(utils.TableName(first)) stmt, placeholders := db.BuildInsertStmt(first) - return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil) + return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit{}) } // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. @@ -517,7 +517,9 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti sem := db.GetSemaphoreForTable(utils.TableName(first)) stmt, placeholders := db.BuildUpsertStmt(first) - return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded) + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded, com.NewSplitOnDupId(), + ) } // UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 6616d9f2..d063976b 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -109,7 +109,7 @@ func (r *RuntimeUpdates) Sync( sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, + ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.NewSplitOnDupId(), ) }) g.Go(func() error { @@ -213,7 +213,7 @@ func (r *RuntimeUpdates) Sync( sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, + ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.NewSplitOnDupId(), ) }) g.Go(func() error { @@ -248,7 +248,7 @@ func (r *RuntimeUpdates) Sync( sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, + ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, com.NewSplitOnDupId(), ) }) g.Go(func() error {