From adcd004231af108519038eca160ecee28b3507ea Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 19 Aug 2022 15:24:23 +0200 Subject: [PATCH] Introduce DB#CreateIgnoreStreamed() --- pkg/icingadb/db.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index bbd02e6d..4494eaf4 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -529,6 +529,28 @@ func (db *DB) CreateStreamed( ) } +// CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateIgnoreStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, placeholders := db.BuildInsertIgnoreStmt(first) + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., + ) +} + // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and