Add special handling for bulks of size 1

Co-authored-by: Eric Lippmann <eric.lippmann@icinga.com>
This commit is contained in:
Alexander A. Klimov 2021-10-29 21:01:28 +02:00 committed by Eric Lippmann
parent fecf332b8e
commit d356909edc

View file

@ -93,5 +93,33 @@ func (b *Bulker) run(ch <-chan interface{}, count int) {
// Bulk reads all values from a channel and streams them in chunks into a returned channel.
func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} {
if count <= 1 {
return oneBulk(ctx, ch)
}
return NewBulker(ctx, ch, count).Bulk()
}
// oneBulk operates just as NewBulker(ctx, ch, 1).Bulk(),
// but without the overhead of the actual bulk creation with a buffer channel and timeout.
func oneBulk(ctx context.Context, ch <-chan interface{}) <-chan []interface{} {
out := make(chan []interface{})
go func() {
defer close(out)
for {
select {
case item := <-ch:
select {
case out <- []interface{}{item}:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}