Merge Bulker and EntityBulker

This commit is contained in:
Alexander A. Klimov 2022-04-04 16:45:49 +02:00
parent 5b3a5cc163
commit 7b352c52db
5 changed files with 80 additions and 201 deletions

View file

@ -2,11 +2,48 @@ package com
import (
"context"
"github.com/icinga/icingadb/pkg/contracts"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles.
// A call takes an item for the current chunk into account.
// Output true indicates that the state machine was reset first and the bulker
// shall finish the current chunk now (not e.g. once $size is reached) without the given item.
type BulkChunkSplitPolicy[T any] func(T) bool
type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T]
// NeverSplit returns a pseudo state machine which never demands splitting.
func NeverSplit[T any]() BulkChunkSplitPolicy[T] {
return neverSplit[T]
}
// SplitOnDupId returns a state machine which tracks the inputs' IDs.
// Once an already seen input arrives, it demands splitting.
func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] {
seenIds := map[string]struct{}{}
return func(ider T) bool {
id := ider.ID().String()
_, ok := seenIds[id]
if ok {
seenIds = map[string]struct{}{id: {}}
} else {
seenIds[id] = struct{}{}
}
return ok
}
}
func neverSplit[T any](T) bool {
return false
}
// Bulker reads all values from a channel and streams them in chunks into a Bulk channel.
type Bulker[T any] struct {
ch chan []T
@ -15,14 +52,16 @@ type Bulker[T any] struct {
}
// NewBulker returns a new Bulker and starts streaming.
func NewBulker[T any](ctx context.Context, ch <-chan T, count int) *Bulker[T] {
func NewBulker[T any](
ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
) *Bulker[T] {
b := &Bulker[T]{
ch: make(chan []T),
ctx: ctx,
mu: sync.Mutex{},
}
go b.run(ch, count)
go b.run(ch, count, splitPolicyFactory)
return b
}
@ -32,10 +71,11 @@ func (b *Bulker[T]) Bulk() <-chan []T {
return b.ch
}
func (b *Bulker[T]) run(ch <-chan T, count int) {
func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) {
defer close(b.ch)
bufCh := make(chan T, count)
splitPolicy := splitPolicyFactory()
g, ctx := errgroup.WithContext(b.ctx)
g.Go(func() error {
@ -70,6 +110,15 @@ func (b *Bulker[T]) run(ch <-chan T, count int) {
break
}
if splitPolicy(v) {
if len(buf) > 0 {
b.ch <- buf
buf = make([]T, 0, count)
}
timeout = time.After(256 * time.Millisecond)
}
buf = append(buf, v)
case <-timeout:
drain = false
@ -81,6 +130,8 @@ func (b *Bulker[T]) run(ch <-chan T, count int) {
if len(buf) > 0 {
b.ch <- buf
}
splitPolicy = splitPolicyFactory()
}
return nil
@ -92,16 +143,18 @@ func (b *Bulker[T]) run(ch <-chan T, count int) {
}
// Bulk reads all values from a channel and streams them in chunks into a returned channel.
func Bulk[T any](ctx context.Context, ch <-chan T, count int) <-chan []T {
func Bulk[T any](
ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
) <-chan []T {
if count <= 1 {
return oneBulk(ctx, ch)
}
return NewBulker(ctx, ch, count).Bulk()
return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk()
}
// oneBulk operates just as NewBulker(ctx, ch, 1).Bulk(),
// but without the overhead of the actual bulk creation with a buffer channel and timeout.
// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(),
// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy.
func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T {
out := make(chan []T)
go func() {
@ -123,3 +176,8 @@ func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T {
return out
}
var (
_ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}]
_ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity]
)

View file

@ -1,183 +0,0 @@
package com
import (
"context"
"github.com/icinga/icingadb/pkg/contracts"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles.
// A call takes an item for the current chunk into account.
// Output true indicates that the state machine was reset first and the bulker
// shall finish the current chunk now (not e.g. once $size is reached) without the given item.
type BulkChunkSplitPolicy func(contracts.Entity) bool
type BulkChunkSplitPolicyFactory func() BulkChunkSplitPolicy
// NeverSplit returns a pseudo state machine which never demands splitting.
func NeverSplit() BulkChunkSplitPolicy {
return neverSplit
}
// SplitOnDupId returns a state machine which tracks the inputs' IDs.
// Once an already seen input arrives, it demands splitting.
func SplitOnDupId() BulkChunkSplitPolicy {
seenIds := map[string]struct{}{}
return func(entity contracts.Entity) bool {
id := entity.ID().String()
_, ok := seenIds[id]
if ok {
seenIds = map[string]struct{}{id: {}}
} else {
seenIds[id] = struct{}{}
}
return ok
}
}
func neverSplit(contracts.Entity) bool {
return false
}
// EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel.
type EntityBulker struct {
ch chan []contracts.Entity
ctx context.Context
mu sync.Mutex
}
// NewEntityBulker returns a new EntityBulker and starts streaming.
func NewEntityBulker(
ctx context.Context, ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory,
) *EntityBulker {
b := &EntityBulker{
ch: make(chan []contracts.Entity),
ctx: ctx,
mu: sync.Mutex{},
}
go b.run(ch, count, splitPolicyFactory)
return b
}
// Bulk returns the channel on which the bulks are delivered.
func (b *EntityBulker) Bulk() <-chan []contracts.Entity {
return b.ch
}
func (b *EntityBulker) run(ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory) {
defer close(b.ch)
bufCh := make(chan contracts.Entity, count)
splitPolicy := splitPolicyFactory()
g, ctx := errgroup.WithContext(b.ctx)
g.Go(func() error {
defer close(bufCh)
for {
select {
case v, ok := <-ch:
if !ok {
return nil
}
bufCh <- v
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {
for done := false; !done; {
buf := make([]contracts.Entity, 0, count)
timeout := time.After(256 * time.Millisecond)
for drain := true; drain && len(buf) < count; {
select {
case v, ok := <-bufCh:
if !ok {
drain = false
done = true
break
}
if splitPolicy(v) {
if len(buf) > 0 {
b.ch <- buf
buf = make([]contracts.Entity, 0, count)
}
timeout = time.After(256 * time.Millisecond)
}
buf = append(buf, v)
case <-timeout:
drain = false
case <-ctx.Done():
return ctx.Err()
}
}
if len(buf) > 0 {
b.ch <- buf
}
splitPolicy = splitPolicyFactory()
}
return nil
})
// We don't expect an error here.
// We only use errgroup for the encapsulated use of sync.WaitGroup.
_ = g.Wait()
}
// BulkEntities reads all entities from a channel and streams them in chunks into a returned channel.
func BulkEntities(
ctx context.Context, ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory,
) <-chan []contracts.Entity {
if count <= 1 {
return oneEntityBulk(ctx, ch)
}
return NewEntityBulker(ctx, ch, count, splitPolicyFactory).Bulk()
}
// oneEntityBulk operates just as NewEntityBulker(ctx, ch, 1, splitPolicy).Bulk(),
// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy.
func oneEntityBulk(ctx context.Context, ch <-chan contracts.Entity) <-chan []contracts.Entity {
out := make(chan []contracts.Entity)
go func() {
defer close(out)
for {
select {
case item := <-ch:
select {
case out <- []contracts.Entity{item}:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
var (
_ BulkChunkSplitPolicyFactory = NeverSplit
_ BulkChunkSplitPolicyFactory = SplitOnDupId
)

View file

@ -240,7 +240,7 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
g, ctx := errgroup.WithContext(ctx)
// Use context from group.
bulk := com.Bulk(ctx, arg, count)
bulk := com.Bulk(ctx, arg, count, com.NeverSplit[any])
g.Go(func() error {
g, ctx := errgroup.WithContext(ctx)
@ -304,14 +304,14 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
// Entities for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory,
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity],
) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count, splitPolicyFactory)
bulk := com.Bulk(ctx, arg, count, splitPolicyFactory)
g.Go(func() error {
for {
@ -379,7 +379,7 @@ func (db *DB) NamedBulkExecTx(
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count, com.NeverSplit)
bulk := com.Bulk(ctx, arg, count, com.NeverSplit[contracts.Entity])
g.Go(func() error {
for {
@ -502,7 +502,9 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildInsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit[contracts.Entity],
)
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
@ -519,7 +521,8 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
stmt, placeholders := db.BuildUpsertStmt(first)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded, com.SplitOnDupId,
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, succeeded, com.SplitOnDupId[contracts.Entity],
)
}

View file

@ -141,7 +141,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
}
}).Stop()
bulks := com.Bulk(ctx, input, s.redis.Options.HScanCount)
bulks := com.Bulk(ctx, input, s.redis.Options.HScanCount, com.NeverSplit[redis.XMessage])
stream := "icinga:history:stream:" + key
for {
select {

View file

@ -109,7 +109,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId,
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId[contracts.Entity],
)
})
g.Go(func() error {
@ -213,7 +213,7 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId,
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId[contracts.Entity],
)
})
g.Go(func() error {
@ -248,7 +248,8 @@ func (r *RuntimeUpdates) Sync(
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, com.SplitOnDupId,
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars,
upsertedFlatCustomvars, com.SplitOnDupId[contracts.Entity],
)
})
g.Go(func() error {