DB#CreateStreamed(): allow monitoring succeeded items

This commit is contained in:
Alexander A. Klimov 2022-05-31 18:11:30 +02:00
parent db4725218e
commit cc352252ec

View file

@ -511,7 +511,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
// The insert statement is created using BuildInsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) CreateStreamed(
ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return errors.Wrap(err, "can't copy first entity")
@ -521,7 +524,8 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
stmt, placeholders := db.BuildInsertStmt(first)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[contracts.Entity],
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, com.NeverSplit[contracts.Entity], onSuccess...,
)
}