From cc352252ecea8e969df81fe94f6fffb1bae7bbaa Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 31 May 2022 18:11:30 +0200 Subject: [PATCH] DB#CreateStreamed(): allow monitoring succeeded items --- pkg/icingadb/db.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index e5375bd8..ea0e0a96 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -511,7 +511,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF // The insert statement is created using BuildInsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return errors.Wrap(err, "can't copy first entity") @@ -521,7 +524,8 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti stmt, placeholders := db.BuildInsertStmt(first) return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[contracts.Entity], + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.NeverSplit[contracts.Entity], onSuccess..., ) }