diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go index 168fa29b..bff50857 100644 --- a/pkg/com/entity_bulker.go +++ b/pkg/com/entity_bulker.go @@ -14,9 +14,6 @@ type BulkChunkSplitPolicy interface { // Output true indicates that the state machine was reset first and the bulker // shall finish the current chunk now (not e.g. once $size is reached) without the given item. Track(contracts.Entity) bool - - // Reset resets the state machine. - Reset() } type BulkChunkSplitPolicyFactory func() BulkChunkSplitPolicy @@ -36,9 +33,6 @@ func (neverSplit) Track(contracts.Entity) bool { return false } -func (neverSplit) Reset() { -} - // splitOnDupId is a state machine which tracks the inputs' IDs. // Once an already seen input arrives, it demands splitting. type splitOnDupId struct { @@ -58,10 +52,6 @@ func (sodi *splitOnDupId) Track(entity contracts.Entity) bool { return ok } -func (sodi *splitOnDupId) Reset() { - sodi.seenIds = map[string]struct{}{} -} - // EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel. type EntityBulker struct { ch chan []contracts.Entity @@ -149,7 +139,7 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int, splitPolicyFac b.ch <- buf } - splitPolicy.Reset() + splitPolicy = splitPolicyFactory() } return nil