From d356909edc0efbb9a18c6a6b5f217c6869385eff Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 29 Oct 2021 21:01:28 +0200 Subject: [PATCH] Add special handling for bulks of size 1 Co-authored-by: Eric Lippmann --- pkg/com/bulker.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index 8dcfecb4..9b7930dc 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -93,5 +93,33 @@ func (b *Bulker) run(ch <-chan interface{}, count int) { // Bulk reads all values from a channel and streams them in chunks into a returned channel. func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} { + if count <= 1 { + return oneBulk(ctx, ch) + } + return NewBulker(ctx, ch, count).Bulk() } + +// oneBulk operates just as NewBulker(ctx, ch, 1).Bulk(), +// but without the overhead of the actual bulk creation with a buffer channel and timeout. +func oneBulk(ctx context.Context, ch <-chan interface{}) <-chan []interface{} { + out := make(chan []interface{}) + go func() { + defer close(out) + + for { + select { + case item := <-ch: + select { + case out <- []interface{}{item}: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out +}