mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-09 00:42:37 -04:00
Add special handling for entity bulks of size 1
Co-authored-by: Eric Lippmann <eric.lippmann@icinga.com>
This commit is contained in:
parent
d356909edc
commit
7d6474f6b5
1 changed files with 28 additions and 0 deletions
|
|
@ -94,5 +94,33 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) {
|
|||
|
||||
// BulkEntities reads all entities from a channel and streams them in chunks into a returned channel.
|
||||
func BulkEntities(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity {
|
||||
if count <= 1 {
|
||||
return oneEntityBulk(ctx, ch)
|
||||
}
|
||||
|
||||
return NewEntityBulker(ctx, ch, count).Bulk()
|
||||
}
|
||||
|
||||
// oneEntityBulk operates just as NewEntityBulker(ctx, ch, 1).Bulk(),
|
||||
// but without the overhead of the actual bulk creation with a buffer channel and timeout.
|
||||
func oneEntityBulk(ctx context.Context, ch <-chan contracts.Entity) <-chan []contracts.Entity {
|
||||
out := make(chan []contracts.Entity)
|
||||
go func() {
|
||||
defer close(out)
|
||||
|
||||
for {
|
||||
select {
|
||||
case item := <-ch:
|
||||
select {
|
||||
case out <- []contracts.Entity{item}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return out
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue