From b13d2c3cd778f1db2cb95052ea70c8e1e705b947 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 18 Mar 2021 14:06:25 +0100 Subject: [PATCH] Restrict Bulker to contracts.Entity --- pkg/com/bulker.go | 17 +++++++++-------- pkg/icingadb/db.go | 16 ++++++++++------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index b13e680e..3118e80a 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -2,21 +2,22 @@ package com import ( "context" + "github.com/icinga/icingadb/pkg/contracts" "golang.org/x/sync/errgroup" "sync" "time" ) type Bulker struct { - ch chan []interface{} + ch chan []contracts.Entity ctx context.Context mu sync.Mutex err error } -func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { +func NewBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *Bulker { b := &Bulker{ - ch: make(chan []interface{}), + ch: make(chan []contracts.Entity), ctx: ctx, mu: sync.Mutex{}, } @@ -27,14 +28,14 @@ func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { } // Bulk returns the channel on which the bulks are delivered. -func (b *Bulker) Bulk() <-chan []interface{} { +func (b *Bulker) Bulk() <-chan []contracts.Entity { return b.ch } -func (b *Bulker) run(ch <-chan interface{}, count int) { +func (b *Bulker) run(ch <-chan contracts.Entity, count int) { defer close(b.ch) - bufCh := make(chan interface{}, count) + bufCh := make(chan contracts.Entity, count) g, ctx := errgroup.WithContext(b.ctx) g.Go(func() error { @@ -56,7 +57,7 @@ func (b *Bulker) run(ch <-chan interface{}, count int) { g.Go(func() error { for done := false; !done; { - buf := make([]interface{}, 0, count) + buf := make([]contracts.Entity, 0, count) timeout := time.After(256 * time.Millisecond) for drain := true; drain && len(buf) < count; { @@ -90,6 +91,6 @@ func (b *Bulker) run(ch <-chan interface{}, count int) { _ = g.Wait() } -func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} { +func Bulk(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity { return NewBulker(ctx, ch, count).Bulk() } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 6e542a1c..c6306a41 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -158,7 +158,9 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i return g.Wait() } -func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error { +func (db DB) NamedBulkExec( + ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, +) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) bulk := com.Bulk(ctx, arg, count) @@ -186,7 +188,7 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr return err } - g.Go(func(b []interface{}) func() error { + g.Go(func(b []contracts.Entity) func() error { return func() error { defer sem.Release(1) @@ -219,7 +221,9 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr return g.Wait() } -func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error { +func (db DB) NamedBulkExecTx( + ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, +) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) bulk := com.Bulk(ctx, arg, count) @@ -243,7 +247,7 @@ func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concu return err } - g.Go(func(b []interface{}) func() error { + g.Go(func(b []contracts.Entity) func() error { return func() error { defer sem.Release(1) @@ -335,7 +339,7 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error return nil } // Buffer of one because we receive an entity and send it back immediately. - inserts := make(chan interface{}, 1) + inserts := make(chan contracts.Entity, 1) inserts <- entity go func() { @@ -360,7 +364,7 @@ func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error return nil } // Buffer of one because we receive an entity and send it back immediately. - updates := make(chan interface{}, 1) + updates := make(chan contracts.Entity, 1) updates <- entity go func() {