diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index e5375bd8..ea0e0a96 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -511,7 +511,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF // The insert statement is created using BuildInsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return errors.Wrap(err, "can't copy first entity") @@ -521,7 +524,8 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti stmt, placeholders := db.BuildInsertStmt(first) return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[contracts.Entity], + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.NeverSplit[contracts.Entity], onSuccess..., ) }