diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go index b4911c2e..1ccb873b 100644 --- a/pkg/com/entity_bulker.go +++ b/pkg/com/entity_bulker.go @@ -94,5 +94,33 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) { // BulkEntities reads all entities from a channel and streams them in chunks into a returned channel. func BulkEntities(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity { + if count <= 1 { + return oneEntityBulk(ctx, ch) + } + return NewEntityBulker(ctx, ch, count).Bulk() } + +// oneEntityBulk operates just as NewEntityBulker(ctx, ch, 1).Bulk(), +// but without the overhead of the actual bulk creation with a buffer channel and timeout. +func oneEntityBulk(ctx context.Context, ch <-chan contracts.Entity) <-chan []contracts.Entity { + out := make(chan []contracts.Entity) + go func() { + defer close(out) + + for { + select { + case item := <-ch: + select { + case out <- []contracts.Entity{item}: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out +}