Rewrite pool

This commit is contained in:
Jean Flach 2019-03-04 16:24:11 +01:00
parent a5568db958
commit e5b439eab9
2 changed files with 12 additions and 38 deletions

View file

@ -4,9 +4,6 @@ import (
"encoding/json"
)
// Number of workers DecodePool uses
var poolSize = 16
type JsonDecodePackage struct{
// Json strings from Redis
ChecksumsRaw string
@ -29,36 +26,14 @@ func decodeString(toDecode string) (map[string]interface{}, error) {
return unJson.(map[string]interface{}), nil
}
// decodePool takes a channel it receives JsonDecodePackages from. These packages are decoded
// by a pool of workers which send their result back through their own channel. Returns error
// if any.
func DecodePool(chInput <-chan JsonDecodePackage) error {
consumers := make([]chan JsonDecodePackage, poolSize)
for i := range consumers {
consumers[i] = make(chan JsonDecodePackage)
go decodePackage(consumers[i])
// decodePool takes a channel it receives JsonDecodePackages from and an error channel to forward errors.
// These packages are decoded by a pool of pollSize workers which send their result back through their own channel.
func DecodePool(chInput <-chan JsonDecodePackage, chError chan error, poolSize int) {
for i := 0; i < poolSize; i++ {
go func(in <-chan JsonDecodePackage, chErrorInternal chan error) {
chErrorInternal <- decodePackage(in)
}(chInput, chError)
}
distribution := func(ch <-chan JsonDecodePackage, consumers []chan JsonDecodePackage) {
defer func(providers []chan JsonDecodePackage) {
for _, channel := range providers {
close(channel)
}
}(consumers)
for {
for _, consumer := range consumers {
select {
case packag := <- ch:
consumer <- packag
}
}
}
}
go distribution(chInput, consumers)
return nil
}
// decodePackage is the worker function for DecodePool. Reads from a channel and sends back decoded
@ -66,14 +41,13 @@ func DecodePool(chInput <-chan JsonDecodePackage) error {
func decodePackage(chInput <-chan JsonDecodePackage) error {
var err error
for input := range chInput{
if input.ChecksumsProcessed == nil {
if input.ChecksumsProcessed == nil && input.ChecksumsRaw != "" {
if input.ChecksumsProcessed, err = decodeString(input.ChecksumsRaw); err != nil {
return err
}
}
if input.ConfigProcessed == nil {
if input.ConfigProcessed == nil && input.ConfigRaw != ""{
if input.ConfigProcessed, err = decodeString(input.ConfigRaw); err != nil {
return err
}

View file

@ -23,6 +23,7 @@ func Test_decodeString(t *testing.T) {
func Test_DecodePool(t *testing.T) {
var chInput = make(chan JsonDecodePackage)
var chOutput = make(chan JsonDecodePackage)
var chError = make(chan error)
var TestPackageA = JsonDecodePackage{
"{\"action_url_id\":\"761ff24e252d57581a7de5d9f417f717fb3c2d7f\",\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"e9fea9581588f18cfb46969268a94166bd0474ae\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"330c09556cbb5e01c180343bb669a2d36b48dd2c\",\"name_checksum\":\"9f75a6ea3ea6f1692538c865133a8a08e48f06d5\",\"notes_url_id\":\"31bb5f9a69c659270e2bcd257b77353669c04d1e\",\"properties_checksum\":\"caff92fafc9a17097304f6c2fb9fa029d3ec8aa8\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}",
@ -40,9 +41,8 @@ func Test_DecodePool(t *testing.T) {
&chOutput,
}
go func() {
assert.NoError(t, DecodePool(chInput))
}()
DecodePool(chInput, chError, 4)
chInput <- TestPackageA
chInput <- TestPackageB