From df0124de09a549237ef74402faf1dd6897cb16fe Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 2 Mar 2021 17:00:42 +0100 Subject: [PATCH] DB#NamedBulkExec(): optionally inform about successfully inserted rows --- pkg/icingadb/db.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index c6306a41..b2ca4ce8 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -159,7 +159,8 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i } func (db DB) NamedBulkExec( - ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, + ctx context.Context, query string, count int, concurrent int, + arg chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -206,6 +207,16 @@ func (db DB) NamedBulkExec( cnt.Add(uint64(len(b))) + if succeeded != nil { + for _, row := range b { + select { + case <-ctx.Done(): + return ctx.Err() + case succeeded <- row: + } + } + } + return nil }, IsRetryable, @@ -354,7 +365,7 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error } }() - return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts) + return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts, nil) } func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error {