From 507a414ca7248597a91425bb608097276ba32cd3 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Mon, 3 May 2021 15:46:56 +0200 Subject: [PATCH] Add com.EntityBulker --- pkg/com/entity_bulker.go | 96 ++++++++++++++++++++++++++++++++++++++++ pkg/icingadb/db.go | 4 +- 2 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 pkg/com/entity_bulker.go diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go new file mode 100644 index 00000000..443063fd --- /dev/null +++ b/pkg/com/entity_bulker.go @@ -0,0 +1,96 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" + "sync" + "time" +) + +type EntityBulker struct { + ch chan []contracts.Entity + ctx context.Context + mu sync.Mutex + err error +} + +func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker { + b := &EntityBulker{ + ch: make(chan []contracts.Entity), + ctx: ctx, + mu: sync.Mutex{}, + } + + go b.run(ch, count) + + return b +} + +// Bulk returns the channel on which the bulks are delivered. +func (b *EntityBulker) Bulk() <-chan []contracts.Entity { + return b.ch +} + +func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) { + defer close(b.ch) + + bufCh := make(chan contracts.Entity, count) + g, ctx := errgroup.WithContext(b.ctx) + + g.Go(func() error { + defer close(bufCh) + + for { + select { + case v, ok := <-ch: + if !ok { + return nil + } + + bufCh <- v + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + for done := false; !done; { + buf := make([]contracts.Entity, 0, count) + timeout := time.After(256 * time.Millisecond) + + for drain := true; drain && len(buf) < count; { + select { + case v, ok := <-bufCh: + if !ok { + drain = false + done = true + + break + } + + buf = append(buf, v) + case <-timeout: + drain = false + case <-ctx.Done(): + return ctx.Err() + } + } + + if len(buf) > 0 { + b.ch <- buf + } + } + + return nil + }) + + // We don't expect an error here. + // We only use errgroup for the encapsulated use of sync.WaitGroup. + _ = g.Wait() +} + +func BulkEntities(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity { + return NewEntityBulker(ctx, ch, count).Bulk() +} diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 80a4592d..369a1e03 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -172,7 +172,7 @@ func (db DB) NamedBulkExec( ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) - bulk := com.Bulk(ctx, arg, count) + bulk := com.BulkEntities(ctx, arg, count) db.logger.Debugf("Executing %s", query) defer utils.Timed(time.Now(), func(elapsed time.Duration) { @@ -245,7 +245,7 @@ func (db DB) NamedBulkExecTx( ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) - bulk := com.Bulk(ctx, arg, count) + bulk := com.BulkEntities(ctx, arg, count) db.logger.Debugf("Executing %s", query) defer utils.Timed(time.Now(), func(elapsed time.Duration) {