diff --git a/json-decoder.go b/json-decoder.go index 8bb51533..ba9f21df 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -4,9 +4,6 @@ import ( "encoding/json" ) -// Number of workers DecodePool uses -var poolSize = 16 - type JsonDecodePackage struct{ // Json strings from Redis ChecksumsRaw string @@ -29,36 +26,14 @@ func decodeString(toDecode string) (map[string]interface{}, error) { return unJson.(map[string]interface{}), nil } -// decodePool takes a channel it receives JsonDecodePackages from. These packages are decoded -// by a pool of workers which send their result back through their own channel. Returns error -// if any. -func DecodePool(chInput <-chan JsonDecodePackage) error { - consumers := make([]chan JsonDecodePackage, poolSize) - for i := range consumers { - consumers[i] = make(chan JsonDecodePackage) - go decodePackage(consumers[i]) +// decodePool takes a channel it receives JsonDecodePackages from and an error channel to forward errors. +// These packages are decoded by a pool of pollSize workers which send their result back through their own channel. +func DecodePool(chInput <-chan JsonDecodePackage, chError chan error, poolSize int) { + for i := 0; i < poolSize; i++ { + go func(in <-chan JsonDecodePackage, chErrorInternal chan error) { + chErrorInternal <- decodePackage(in) + }(chInput, chError) } - - distribution := func(ch <-chan JsonDecodePackage, consumers []chan JsonDecodePackage) { - defer func(providers []chan JsonDecodePackage) { - for _, channel := range providers { - close(channel) - } - }(consumers) - - for { - for _, consumer := range consumers { - select { - case packag := <- ch: - consumer <- packag - } - } - } - } - - go distribution(chInput, consumers) - - return nil } // decodePackage is the worker function for DecodePool. Reads from a channel and sends back decoded @@ -66,14 +41,13 @@ func DecodePool(chInput <-chan JsonDecodePackage) error { func decodePackage(chInput <-chan JsonDecodePackage) error { var err error - for input := range chInput{ - if input.ChecksumsProcessed == nil { + if input.ChecksumsProcessed == nil && input.ChecksumsRaw != "" { if input.ChecksumsProcessed, err = decodeString(input.ChecksumsRaw); err != nil { return err } } - if input.ConfigProcessed == nil { + if input.ConfigProcessed == nil && input.ConfigRaw != ""{ if input.ConfigProcessed, err = decodeString(input.ConfigRaw); err != nil { return err } diff --git a/json-decoder_test.go b/json-decoder_test.go index b377a331..b1519b2e 100644 --- a/json-decoder_test.go +++ b/json-decoder_test.go @@ -23,6 +23,7 @@ func Test_decodeString(t *testing.T) { func Test_DecodePool(t *testing.T) { var chInput = make(chan JsonDecodePackage) var chOutput = make(chan JsonDecodePackage) + var chError = make(chan error) var TestPackageA = JsonDecodePackage{ "{\"action_url_id\":\"761ff24e252d57581a7de5d9f417f717fb3c2d7f\",\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"e9fea9581588f18cfb46969268a94166bd0474ae\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"330c09556cbb5e01c180343bb669a2d36b48dd2c\",\"name_checksum\":\"9f75a6ea3ea6f1692538c865133a8a08e48f06d5\",\"notes_url_id\":\"31bb5f9a69c659270e2bcd257b77353669c04d1e\",\"properties_checksum\":\"caff92fafc9a17097304f6c2fb9fa029d3ec8aa8\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}", @@ -40,9 +41,8 @@ func Test_DecodePool(t *testing.T) { &chOutput, } - go func() { - assert.NoError(t, DecodePool(chInput)) - }() + DecodePool(chInput, chError, 4) + chInput <- TestPackageA chInput <- TestPackageB