Throw BulkChunkSplitPolicy#Reset() away

refs #136
This commit is contained in:
Alexander A. Klimov 2021-11-30 16:56:44 +01:00
parent 10a70e8b71
commit 8da164c50f

View file

@ -14,9 +14,6 @@ type BulkChunkSplitPolicy interface {
// Output true indicates that the state machine was reset first and the bulker
// shall finish the current chunk now (not e.g. once $size is reached) without the given item.
Track(contracts.Entity) bool
// Reset resets the state machine.
Reset()
}
type BulkChunkSplitPolicyFactory func() BulkChunkSplitPolicy
@ -36,9 +33,6 @@ func (neverSplit) Track(contracts.Entity) bool {
return false
}
func (neverSplit) Reset() {
}
// splitOnDupId is a state machine which tracks the inputs' IDs.
// Once an already seen input arrives, it demands splitting.
type splitOnDupId struct {
@ -58,10 +52,6 @@ func (sodi *splitOnDupId) Track(entity contracts.Entity) bool {
return ok
}
func (sodi *splitOnDupId) Reset() {
sodi.seenIds = map[string]struct{}{}
}
// EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel.
type EntityBulker struct {
ch chan []contracts.Entity
@ -149,7 +139,7 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int, splitPolicyFac
b.ch <- buf
}
splitPolicy.Reset()
splitPolicy = splitPolicyFactory()
}
return nil