diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index bbd02e6d..4494eaf4 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -529,6 +529,28 @@ func (db *DB) CreateStreamed( ) } +// CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateIgnoreStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, placeholders := db.BuildInsertIgnoreStmt(first) + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., + ) +} + // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and