diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index c6306a41..b2ca4ce8 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -159,7 +159,8 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i } func (db DB) NamedBulkExec( - ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, + ctx context.Context, query string, count int, concurrent int, + arg chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -206,6 +207,16 @@ func (db DB) NamedBulkExec( cnt.Add(uint64(len(b))) + if succeeded != nil { + for _, row := range b { + select { + case <-ctx.Done(): + return ctx.Err() + case succeeded <- row: + } + } + } + return nil }, IsRetryable, @@ -354,7 +365,7 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error } }() - return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts) + return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts, nil) } func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error {