diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index a08912f5..dd0b254b 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -2,11 +2,48 @@ package com import ( "context" + "github.com/icinga/icingadb/pkg/contracts" "golang.org/x/sync/errgroup" "sync" "time" ) +// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. +// A call takes an item for the current chunk into account. +// Output true indicates that the state machine was reset first and the bulker +// shall finish the current chunk now (not e.g. once $size is reached) without the given item. +type BulkChunkSplitPolicy[T any] func(T) bool + +type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T] + +// NeverSplit returns a pseudo state machine which never demands splitting. +func NeverSplit[T any]() BulkChunkSplitPolicy[T] { + return neverSplit[T] +} + +// SplitOnDupId returns a state machine which tracks the inputs' IDs. +// Once an already seen input arrives, it demands splitting. +func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] { + seenIds := map[string]struct{}{} + + return func(ider T) bool { + id := ider.ID().String() + + _, ok := seenIds[id] + if ok { + seenIds = map[string]struct{}{id: {}} + } else { + seenIds[id] = struct{}{} + } + + return ok + } +} + +func neverSplit[T any](T) bool { + return false +} + // Bulker reads all values from a channel and streams them in chunks into a Bulk channel. type Bulker[T any] struct { ch chan []T @@ -15,14 +52,16 @@ type Bulker[T any] struct { } // NewBulker returns a new Bulker and starts streaming. -func NewBulker[T any](ctx context.Context, ch <-chan T, count int) *Bulker[T] { +func NewBulker[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) *Bulker[T] { b := &Bulker[T]{ ch: make(chan []T), ctx: ctx, mu: sync.Mutex{}, } - go b.run(ch, count) + go b.run(ch, count, splitPolicyFactory) return b } @@ -32,10 +71,11 @@ func (b *Bulker[T]) Bulk() <-chan []T { return b.ch } -func (b *Bulker[T]) run(ch <-chan T, count int) { +func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) { defer close(b.ch) bufCh := make(chan T, count) + splitPolicy := splitPolicyFactory() g, ctx := errgroup.WithContext(b.ctx) g.Go(func() error { @@ -70,6 +110,15 @@ func (b *Bulker[T]) run(ch <-chan T, count int) { break } + if splitPolicy(v) { + if len(buf) > 0 { + b.ch <- buf + buf = make([]T, 0, count) + } + + timeout = time.After(256 * time.Millisecond) + } + buf = append(buf, v) case <-timeout: drain = false @@ -81,6 +130,8 @@ func (b *Bulker[T]) run(ch <-chan T, count int) { if len(buf) > 0 { b.ch <- buf } + + splitPolicy = splitPolicyFactory() } return nil @@ -92,16 +143,18 @@ func (b *Bulker[T]) run(ch <-chan T, count int) { } // Bulk reads all values from a channel and streams them in chunks into a returned channel. -func Bulk[T any](ctx context.Context, ch <-chan T, count int) <-chan []T { +func Bulk[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) <-chan []T { if count <= 1 { return oneBulk(ctx, ch) } - return NewBulker(ctx, ch, count).Bulk() + return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk() } -// oneBulk operates just as NewBulker(ctx, ch, 1).Bulk(), -// but without the overhead of the actual bulk creation with a buffer channel and timeout. +// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(), +// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy. func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T { out := make(chan []T) go func() { @@ -123,3 +176,8 @@ func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T { return out } + +var ( + _ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}] + _ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity] +) diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go deleted file mode 100644 index 0396ec09..00000000 --- a/pkg/com/entity_bulker.go +++ /dev/null @@ -1,183 +0,0 @@ -package com - -import ( - "context" - "github.com/icinga/icingadb/pkg/contracts" - "golang.org/x/sync/errgroup" - "sync" - "time" -) - -// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. -// A call takes an item for the current chunk into account. -// Output true indicates that the state machine was reset first and the bulker -// shall finish the current chunk now (not e.g. once $size is reached) without the given item. -type BulkChunkSplitPolicy func(contracts.Entity) bool - -type BulkChunkSplitPolicyFactory func() BulkChunkSplitPolicy - -// NeverSplit returns a pseudo state machine which never demands splitting. -func NeverSplit() BulkChunkSplitPolicy { - return neverSplit -} - -// SplitOnDupId returns a state machine which tracks the inputs' IDs. -// Once an already seen input arrives, it demands splitting. -func SplitOnDupId() BulkChunkSplitPolicy { - seenIds := map[string]struct{}{} - - return func(entity contracts.Entity) bool { - id := entity.ID().String() - - _, ok := seenIds[id] - if ok { - seenIds = map[string]struct{}{id: {}} - } else { - seenIds[id] = struct{}{} - } - - return ok - } -} - -func neverSplit(contracts.Entity) bool { - return false -} - -// EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel. -type EntityBulker struct { - ch chan []contracts.Entity - ctx context.Context - mu sync.Mutex -} - -// NewEntityBulker returns a new EntityBulker and starts streaming. -func NewEntityBulker( - ctx context.Context, ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory, -) *EntityBulker { - b := &EntityBulker{ - ch: make(chan []contracts.Entity), - ctx: ctx, - mu: sync.Mutex{}, - } - - go b.run(ch, count, splitPolicyFactory) - - return b -} - -// Bulk returns the channel on which the bulks are delivered. -func (b *EntityBulker) Bulk() <-chan []contracts.Entity { - return b.ch -} - -func (b *EntityBulker) run(ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory) { - defer close(b.ch) - - bufCh := make(chan contracts.Entity, count) - splitPolicy := splitPolicyFactory() - g, ctx := errgroup.WithContext(b.ctx) - - g.Go(func() error { - defer close(bufCh) - - for { - select { - case v, ok := <-ch: - if !ok { - return nil - } - - bufCh <- v - case <-ctx.Done(): - return ctx.Err() - } - } - }) - - g.Go(func() error { - for done := false; !done; { - buf := make([]contracts.Entity, 0, count) - timeout := time.After(256 * time.Millisecond) - - for drain := true; drain && len(buf) < count; { - select { - case v, ok := <-bufCh: - if !ok { - drain = false - done = true - - break - } - - if splitPolicy(v) { - if len(buf) > 0 { - b.ch <- buf - buf = make([]contracts.Entity, 0, count) - } - - timeout = time.After(256 * time.Millisecond) - } - - buf = append(buf, v) - case <-timeout: - drain = false - case <-ctx.Done(): - return ctx.Err() - } - } - - if len(buf) > 0 { - b.ch <- buf - } - - splitPolicy = splitPolicyFactory() - } - - return nil - }) - - // We don't expect an error here. - // We only use errgroup for the encapsulated use of sync.WaitGroup. - _ = g.Wait() -} - -// BulkEntities reads all entities from a channel and streams them in chunks into a returned channel. -func BulkEntities( - ctx context.Context, ch <-chan contracts.Entity, count int, splitPolicyFactory BulkChunkSplitPolicyFactory, -) <-chan []contracts.Entity { - if count <= 1 { - return oneEntityBulk(ctx, ch) - } - - return NewEntityBulker(ctx, ch, count, splitPolicyFactory).Bulk() -} - -// oneEntityBulk operates just as NewEntityBulker(ctx, ch, 1, splitPolicy).Bulk(), -// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy. -func oneEntityBulk(ctx context.Context, ch <-chan contracts.Entity) <-chan []contracts.Entity { - out := make(chan []contracts.Entity) - go func() { - defer close(out) - - for { - select { - case item := <-ch: - select { - case out <- []contracts.Entity{item}: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } - } - }() - - return out -} - -var ( - _ BulkChunkSplitPolicyFactory = NeverSplit - _ BulkChunkSplitPolicyFactory = SplitOnDupId -) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 500d0e39..ba631350 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -240,7 +240,7 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph g, ctx := errgroup.WithContext(ctx) // Use context from group. - bulk := com.Bulk(ctx, arg, count) + bulk := com.Bulk(ctx, arg, count, com.NeverSplit[any]) g.Go(func() error { g, ctx := errgroup.WithContext(ctx) @@ -304,14 +304,14 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph // and can be executed concurrently to the extent allowed by the semaphore passed in sem. // Entities for which the query ran successfully will be streamed on the succeeded channel. func (db *DB) NamedBulkExec( - ctx context.Context, query string, count int, sem *semaphore.Weighted, - arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory, + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, + succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], ) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() g, ctx := errgroup.WithContext(ctx) - bulk := com.BulkEntities(ctx, arg, count, splitPolicyFactory) + bulk := com.Bulk(ctx, arg, count, splitPolicyFactory) g.Go(func() error { for { @@ -379,7 +379,7 @@ func (db *DB) NamedBulkExecTx( defer db.log(ctx, query, &counter).Stop() g, ctx := errgroup.WithContext(ctx) - bulk := com.BulkEntities(ctx, arg, count, com.NeverSplit) + bulk := com.Bulk(ctx, arg, count, com.NeverSplit[contracts.Entity]) g.Go(func() error { for { @@ -502,7 +502,9 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti sem := db.GetSemaphoreForTable(utils.TableName(first)) stmt, placeholders := db.BuildInsertStmt(first) - return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit) + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit[contracts.Entity], + ) } // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. @@ -519,7 +521,8 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti stmt, placeholders := db.BuildUpsertStmt(first) return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded, com.SplitOnDupId, + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, succeeded, com.SplitOnDupId[contracts.Entity], ) } diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index bd586fc5..e1d6160b 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -141,7 +141,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi } }).Stop() - bulks := com.Bulk(ctx, input, s.redis.Options.HScanCount) + bulks := com.Bulk(ctx, input, s.redis.Options.HScanCount, com.NeverSplit[redis.XMessage]) stream := "icinga:history:stream:" + key for { select { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 1a3ee8af..636c0bb6 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -109,7 +109,7 @@ func (r *RuntimeUpdates) Sync( sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId, + ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId[contracts.Entity], ) }) g.Go(func() error { @@ -213,7 +213,7 @@ func (r *RuntimeUpdates) Sync( sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId, + ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId[contracts.Entity], ) }) g.Go(func() error { @@ -248,7 +248,8 @@ func (r *RuntimeUpdates) Sync( sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, com.SplitOnDupId, + ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, + upsertedFlatCustomvars, com.SplitOnDupId[contracts.Entity], ) }) g.Go(func() error {