Restrict Bulker to contracts.Entity

This commit is contained in:
Alexander A. Klimov 2021-03-18 14:06:25 +01:00
parent 581cac27aa
commit b13d2c3cd7
2 changed files with 19 additions and 14 deletions

View file

@ -2,21 +2,22 @@ package com
import (
"context"
"github.com/icinga/icingadb/pkg/contracts"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
type Bulker struct {
ch chan []interface{}
ch chan []contracts.Entity
ctx context.Context
mu sync.Mutex
err error
}
func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker {
func NewBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *Bulker {
b := &Bulker{
ch: make(chan []interface{}),
ch: make(chan []contracts.Entity),
ctx: ctx,
mu: sync.Mutex{},
}
@ -27,14 +28,14 @@ func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker {
}
// Bulk returns the channel on which the bulks are delivered.
func (b *Bulker) Bulk() <-chan []interface{} {
func (b *Bulker) Bulk() <-chan []contracts.Entity {
return b.ch
}
func (b *Bulker) run(ch <-chan interface{}, count int) {
func (b *Bulker) run(ch <-chan contracts.Entity, count int) {
defer close(b.ch)
bufCh := make(chan interface{}, count)
bufCh := make(chan contracts.Entity, count)
g, ctx := errgroup.WithContext(b.ctx)
g.Go(func() error {
@ -56,7 +57,7 @@ func (b *Bulker) run(ch <-chan interface{}, count int) {
g.Go(func() error {
for done := false; !done; {
buf := make([]interface{}, 0, count)
buf := make([]contracts.Entity, 0, count)
timeout := time.After(256 * time.Millisecond)
for drain := true; drain && len(buf) < count; {
@ -90,6 +91,6 @@ func (b *Bulker) run(ch <-chan interface{}, count int) {
_ = g.Wait()
}
func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} {
func Bulk(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity {
return NewBulker(ctx, ch, count).Bulk()
}

View file

@ -158,7 +158,9 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i
return g.Wait()
}
func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error {
func (db DB) NamedBulkExec(
ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity,
) error {
var cnt com.Counter
g, ctx := errgroup.WithContext(ctx)
bulk := com.Bulk(ctx, arg, count)
@ -186,7 +188,7 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr
return err
}
g.Go(func(b []interface{}) func() error {
g.Go(func(b []contracts.Entity) func() error {
return func() error {
defer sem.Release(1)
@ -219,7 +221,9 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr
return g.Wait()
}
func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error {
func (db DB) NamedBulkExecTx(
ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity,
) error {
var cnt com.Counter
g, ctx := errgroup.WithContext(ctx)
bulk := com.Bulk(ctx, arg, count)
@ -243,7 +247,7 @@ func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concu
return err
}
g.Go(func(b []interface{}) func() error {
g.Go(func(b []contracts.Entity) func() error {
return func() error {
defer sem.Release(1)
@ -335,7 +339,7 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error
return nil
}
// Buffer of one because we receive an entity and send it back immediately.
inserts := make(chan interface{}, 1)
inserts := make(chan contracts.Entity, 1)
inserts <- entity
go func() {
@ -360,7 +364,7 @@ func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error
return nil
}
// Buffer of one because we receive an entity and send it back immediately.
updates := make(chan interface{}, 1)
updates := make(chan contracts.Entity, 1)
updates <- entity
go func() {