Introduce DB#CreateIgnoreStreamed()

This commit is contained in:
Alexander A. Klimov 2022-08-19 15:24:23 +02:00
parent 23130d7be8
commit adcd004231

View file

@ -529,6 +529,28 @@ func (db *DB) CreateStreamed(
)
}
// CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec.
// The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) CreateIgnoreStreamed(
ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return errors.Wrap(err, "can't copy first entity")
}
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildInsertIgnoreStmt(first)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, com.SplitOnDupId[contracts.Entity], onSuccess...,
)
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and