From c2b435499d310abd216679222757fd623884d779 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 4 Mar 2019 11:03:50 +0100 Subject: [PATCH 01/10] Initial commit --- json-decoder.go | 77 ++++++++++++++++++++++++++++++++++++ json-decoder_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 json-decoder.go create mode 100644 json-decoder_test.go diff --git a/json-decoder.go b/json-decoder.go new file mode 100644 index 00000000..83f52696 --- /dev/null +++ b/json-decoder.go @@ -0,0 +1,77 @@ +package icingadb_json_decoder + +import ( + "encoding/json" +) + +var poolSize = 16 + +type JsonDecodePackage struct{ + ChecksumsRaw string + ConfigRaw string + ChecksumsProcessed map[string]interface{} + ConfigProcessed map[string]interface{} + ChBack *chan JsonDecodePackage +} + +func decodeString(toDecode string) (map[string]interface{}, error) { + var unJson interface{} = nil + if err := json.Unmarshal([]byte(toDecode), &unJson); err != nil { + return nil, err + } + + return unJson.(map[string]interface{}), nil +} + +func decodePool(chInput <-chan JsonDecodePackage) error { + consumers := make([]chan JsonDecodePackage, poolSize) + for i := range consumers { + consumers[i] = make(chan JsonDecodePackage) + go DecodePackage(consumers[i]) + } + + distribution := func(ch <-chan JsonDecodePackage, consumers []chan JsonDecodePackage) { + defer func(providers []chan JsonDecodePackage) { + for _, channel := range providers { + close(channel) + } + }(consumers) + + for { + for _, consumer := range consumers { + select { + case packag := <- ch: + consumer <- packag + } + } + } + } + + go distribution(chInput, consumers) + + return nil +} + +func DecodePackage(chInput <-chan JsonDecodePackage) error { + var err error + + + for input := range chInput{ + if input.ChecksumsProcessed == nil { + if input.ChecksumsProcessed, err = decodeString(input.ChecksumsRaw); err != nil { + return err + } + } + if input.ConfigProcessed == nil { + if input.ConfigProcessed, err = decodeString(input.ConfigRaw); err != nil { + return err + } + } + + *input.ChBack <- JsonDecodePackage(input) + + } + + + return nil +} \ No newline at end of file diff --git a/json-decoder_test.go b/json-decoder_test.go new file mode 100644 index 00000000..032569ef --- /dev/null +++ b/json-decoder_test.go @@ -0,0 +1,94 @@ +package icingadb_json_decoder + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + + + +func Test_decodeString(t *testing.T) { + var testCorrect = "{\"Integer\": 2.0, \"String\": \"Test One Two Three\"}" + var testBroken = "{ahahahahaha}" + + ret, err := decodeString(testCorrect) + assert.NoError(t, err) + assert.Equal(t, 2.0, ret["Integer"]) + assert.Equal(t, "Test One Two Three", ret["String"]) + + _, err = decodeString(testBroken) + assert.Error(t, err) +} + +func Test_decodePool(t *testing.T) { + var chInput = make(chan JsonDecodePackage) + var chOutput = make(chan JsonDecodePackage) + + var TestPackageA = JsonDecodePackage{ + "{\"action_url_id\":\"761ff24e252d57581a7de5d9f417f717fb3c2d7f\",\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"e9fea9581588f18cfb46969268a94166bd0474ae\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"330c09556cbb5e01c180343bb669a2d36b48dd2c\",\"name_checksum\":\"9f75a6ea3ea6f1692538c865133a8a08e48f06d5\",\"notes_url_id\":\"31bb5f9a69c659270e2bcd257b77353669c04d1e\",\"properties_checksum\":\"caff92fafc9a17097304f6c2fb9fa029d3ec8aa8\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}", + "{\"active_checks_enabled\":false,\"check_interval\":300.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"dummy\",\"display_name\":\"TestService A - 0.0\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"TestService A - 0.0\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true,\"zone\":\"double\"}", + nil, + nil, + &chOutput, + } + + var TestPackageB = JsonDecodePackage{ + "{\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"efb9e8a4dff9ee330838909403655ae376251dc9\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"7bb83f280fee68146e223b51c02c9ac1e5d56305\",\"name_checksum\":\"92420fe84a880f5b7675ba0fb0f4f730f40a144a\",\"properties_checksum\":\"8563b9113161953acabb7bba779cc5706494eb3b\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}", + "{\"active_checks_enabled\":false,\"check_interval\":300.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"dummy\",\"display_name\":\"TestService B - 0.0\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"TestService B - 0.0\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true,\"zone\":\"double\"}", + nil, + nil, + &chOutput, + } + + go func() { + assert.NoError(t, decodePool(chInput)) + }() + + chInput <- TestPackageA + chInput <- TestPackageB + close(chInput) + + resultA := <-chOutput + resultB := <-chOutput + + assert.NotNil(t, resultA.ConfigProcessed) + assert.NotNil(t, resultB.ConfigProcessed) +} + +func Test_decodePackage(t *testing.T) { + var chInput = make(chan JsonDecodePackage) + var chOutput = make(chan JsonDecodePackage) + + var TestPackageA = JsonDecodePackage{ + "{\"action_url_id\":\"761ff24e252d57581a7de5d9f417f717fb3c2d7f\",\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"e9fea9581588f18cfb46969268a94166bd0474ae\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"330c09556cbb5e01c180343bb669a2d36b48dd2c\",\"name_checksum\":\"9f75a6ea3ea6f1692538c865133a8a08e48f06d5\",\"notes_url_id\":\"31bb5f9a69c659270e2bcd257b77353669c04d1e\",\"properties_checksum\":\"caff92fafc9a17097304f6c2fb9fa029d3ec8aa8\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}", + "{\"active_checks_enabled\":false,\"check_interval\":300.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"dummy\",\"display_name\":\"TestService A - 0.0\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"TestService A - 0.0\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true,\"zone\":\"double\"}", + nil, + nil, + &chOutput, + } + + var TestPackageB = JsonDecodePackage{ + "{\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"efb9e8a4dff9ee330838909403655ae376251dc9\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"7bb83f280fee68146e223b51c02c9ac1e5d56305\",\"name_checksum\":\"92420fe84a880f5b7675ba0fb0f4f730f40a144a\",\"properties_checksum\":\"8563b9113161953acabb7bba779cc5706494eb3b\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}", + "{\"active_checks_enabled\":false,\"check_interval\":300.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"dummy\",\"display_name\":\"TestService B - 0.0\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"TestService B - 0.0\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true,\"zone\":\"double\"}", + nil, + nil, + &chOutput, + } + + go func() { + err := DecodePackage(chInput) + assert.NoError(t, err) + }() + + go func() { + resultA := <-chOutput + resultB := <-chOutput + + assert.NotNil(t, resultA.ConfigProcessed) + assert.NotNil(t, resultB.ConfigProcessed) + }() + + chInput <- TestPackageA + chInput <- TestPackageB + close(chInput) +} \ No newline at end of file From 60541703bfeb9c9001ffe68f0049550d0227c043 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 4 Mar 2019 11:05:40 +0100 Subject: [PATCH 02/10] Close channels --- json-decoder.go | 1 - json-decoder_test.go | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/json-decoder.go b/json-decoder.go index 83f52696..7c24de32 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -72,6 +72,5 @@ func DecodePackage(chInput <-chan JsonDecodePackage) error { } - return nil } \ No newline at end of file diff --git a/json-decoder_test.go b/json-decoder_test.go index 032569ef..5f56df46 100644 --- a/json-decoder_test.go +++ b/json-decoder_test.go @@ -50,6 +50,7 @@ func Test_decodePool(t *testing.T) { resultA := <-chOutput resultB := <-chOutput + close(chOutput) assert.NotNil(t, resultA.ConfigProcessed) assert.NotNil(t, resultB.ConfigProcessed) @@ -91,4 +92,5 @@ func Test_decodePackage(t *testing.T) { chInput <- TestPackageA chInput <- TestPackageB close(chInput) + close(chOutput) } \ No newline at end of file From a5568db95894bd257c0fb7577bd16eb1d09c3d0d Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 4 Mar 2019 11:41:18 +0100 Subject: [PATCH 03/10] Code style and documentation --- json-decoder.go | 17 ++++++++++++++--- json-decoder_test.go | 6 +++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index 7c24de32..8bb51533 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -4,16 +4,22 @@ import ( "encoding/json" ) +// Number of workers DecodePool uses var poolSize = 16 type JsonDecodePackage struct{ + // Json strings from Redis ChecksumsRaw string ConfigRaw string + // When unmarshaled, results will be written here ChecksumsProcessed map[string]interface{} ConfigProcessed map[string]interface{} + // Package will be sent back through this channel ChBack *chan JsonDecodePackage } +// decodeString unmarshals the string toDecode using the json package. Returns the object as a +// map[string]interface and nil if successful, error if not. func decodeString(toDecode string) (map[string]interface{}, error) { var unJson interface{} = nil if err := json.Unmarshal([]byte(toDecode), &unJson); err != nil { @@ -23,11 +29,14 @@ func decodeString(toDecode string) (map[string]interface{}, error) { return unJson.(map[string]interface{}), nil } -func decodePool(chInput <-chan JsonDecodePackage) error { +// decodePool takes a channel it receives JsonDecodePackages from. These packages are decoded +// by a pool of workers which send their result back through their own channel. Returns error +// if any. +func DecodePool(chInput <-chan JsonDecodePackage) error { consumers := make([]chan JsonDecodePackage, poolSize) for i := range consumers { consumers[i] = make(chan JsonDecodePackage) - go DecodePackage(consumers[i]) + go decodePackage(consumers[i]) } distribution := func(ch <-chan JsonDecodePackage, consumers []chan JsonDecodePackage) { @@ -52,7 +61,9 @@ func decodePool(chInput <-chan JsonDecodePackage) error { return nil } -func DecodePackage(chInput <-chan JsonDecodePackage) error { +// decodePackage is the worker function for DecodePool. Reads from a channel and sends back decoded +// packages. Returns error if any. +func decodePackage(chInput <-chan JsonDecodePackage) error { var err error diff --git a/json-decoder_test.go b/json-decoder_test.go index 5f56df46..b377a331 100644 --- a/json-decoder_test.go +++ b/json-decoder_test.go @@ -20,7 +20,7 @@ func Test_decodeString(t *testing.T) { assert.Error(t, err) } -func Test_decodePool(t *testing.T) { +func Test_DecodePool(t *testing.T) { var chInput = make(chan JsonDecodePackage) var chOutput = make(chan JsonDecodePackage) @@ -41,7 +41,7 @@ func Test_decodePool(t *testing.T) { } go func() { - assert.NoError(t, decodePool(chInput)) + assert.NoError(t, DecodePool(chInput)) }() chInput <- TestPackageA @@ -77,7 +77,7 @@ func Test_decodePackage(t *testing.T) { } go func() { - err := DecodePackage(chInput) + err := decodePackage(chInput) assert.NoError(t, err) }() From e5b439eab9fbb1b63f2e8a143b0334db4838a963 Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Mon, 4 Mar 2019 16:24:11 +0100 Subject: [PATCH 04/10] Rewrite pool --- json-decoder.go | 44 +++++++++----------------------------------- json-decoder_test.go | 6 +++--- 2 files changed, 12 insertions(+), 38 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index 8bb51533..ba9f21df 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -4,9 +4,6 @@ import ( "encoding/json" ) -// Number of workers DecodePool uses -var poolSize = 16 - type JsonDecodePackage struct{ // Json strings from Redis ChecksumsRaw string @@ -29,36 +26,14 @@ func decodeString(toDecode string) (map[string]interface{}, error) { return unJson.(map[string]interface{}), nil } -// decodePool takes a channel it receives JsonDecodePackages from. These packages are decoded -// by a pool of workers which send their result back through their own channel. Returns error -// if any. -func DecodePool(chInput <-chan JsonDecodePackage) error { - consumers := make([]chan JsonDecodePackage, poolSize) - for i := range consumers { - consumers[i] = make(chan JsonDecodePackage) - go decodePackage(consumers[i]) +// decodePool takes a channel it receives JsonDecodePackages from and an error channel to forward errors. +// These packages are decoded by a pool of pollSize workers which send their result back through their own channel. +func DecodePool(chInput <-chan JsonDecodePackage, chError chan error, poolSize int) { + for i := 0; i < poolSize; i++ { + go func(in <-chan JsonDecodePackage, chErrorInternal chan error) { + chErrorInternal <- decodePackage(in) + }(chInput, chError) } - - distribution := func(ch <-chan JsonDecodePackage, consumers []chan JsonDecodePackage) { - defer func(providers []chan JsonDecodePackage) { - for _, channel := range providers { - close(channel) - } - }(consumers) - - for { - for _, consumer := range consumers { - select { - case packag := <- ch: - consumer <- packag - } - } - } - } - - go distribution(chInput, consumers) - - return nil } // decodePackage is the worker function for DecodePool. Reads from a channel and sends back decoded @@ -66,14 +41,13 @@ func DecodePool(chInput <-chan JsonDecodePackage) error { func decodePackage(chInput <-chan JsonDecodePackage) error { var err error - for input := range chInput{ - if input.ChecksumsProcessed == nil { + if input.ChecksumsProcessed == nil && input.ChecksumsRaw != "" { if input.ChecksumsProcessed, err = decodeString(input.ChecksumsRaw); err != nil { return err } } - if input.ConfigProcessed == nil { + if input.ConfigProcessed == nil && input.ConfigRaw != ""{ if input.ConfigProcessed, err = decodeString(input.ConfigRaw); err != nil { return err } diff --git a/json-decoder_test.go b/json-decoder_test.go index b377a331..b1519b2e 100644 --- a/json-decoder_test.go +++ b/json-decoder_test.go @@ -23,6 +23,7 @@ func Test_decodeString(t *testing.T) { func Test_DecodePool(t *testing.T) { var chInput = make(chan JsonDecodePackage) var chOutput = make(chan JsonDecodePackage) + var chError = make(chan error) var TestPackageA = JsonDecodePackage{ "{\"action_url_id\":\"761ff24e252d57581a7de5d9f417f717fb3c2d7f\",\"checkcommand_id\":\"f5e3b3b22741f40c74326fbcc79d9c331d8fa4ee\",\"customvars_checksum\":\"e9fea9581588f18cfb46969268a94166bd0474ae\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[\"a63234de9f608c4a4f86053870d79610ec58b258\"],\"groups_checksum\":\"9878a753d010eb1bbde57bb78727a6e6ba26aa51\",\"host_id\":\"330c09556cbb5e01c180343bb669a2d36b48dd2c\",\"name_checksum\":\"9f75a6ea3ea6f1692538c865133a8a08e48f06d5\",\"notes_url_id\":\"31bb5f9a69c659270e2bcd257b77353669c04d1e\",\"properties_checksum\":\"caff92fafc9a17097304f6c2fb9fa029d3ec8aa8\",\"zone_id\":\"407eaa141abcae8ee554e4fe4b9e9b726bac4b77\"}", @@ -40,9 +41,8 @@ func Test_DecodePool(t *testing.T) { &chOutput, } - go func() { - assert.NoError(t, DecodePool(chInput)) - }() + DecodePool(chInput, chError, 4) + chInput <- TestPackageA chInput <- TestPackageB From 1f81bab6c86855a53863087cec7919e3231a14fc Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Tue, 5 Mar 2019 15:38:35 +0100 Subject: [PATCH 05/10] Use jsonitor --- json-decoder.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index ba9f21df..33b6bbc3 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -1,9 +1,11 @@ package icingadb_json_decoder import ( - "encoding/json" + "github.com/json-iterator/go" ) +var json = jsoniter.ConfigCompatibleWithStandardLibrary + type JsonDecodePackage struct{ // Json strings from Redis ChecksumsRaw string @@ -12,7 +14,7 @@ type JsonDecodePackage struct{ ChecksumsProcessed map[string]interface{} ConfigProcessed map[string]interface{} // Package will be sent back through this channel - ChBack *chan JsonDecodePackage + ChBack *chan *JsonDecodePackage } // decodeString unmarshals the string toDecode using the json package. Returns the object as a @@ -28,9 +30,9 @@ func decodeString(toDecode string) (map[string]interface{}, error) { // decodePool takes a channel it receives JsonDecodePackages from and an error channel to forward errors. // These packages are decoded by a pool of pollSize workers which send their result back through their own channel. -func DecodePool(chInput <-chan JsonDecodePackage, chError chan error, poolSize int) { +func DecodePool(chInput <-chan *JsonDecodePackage, chError chan error, poolSize int) { for i := 0; i < poolSize; i++ { - go func(in <-chan JsonDecodePackage, chErrorInternal chan error) { + go func(in <-chan *JsonDecodePackage, chErrorInternal chan error) { chErrorInternal <- decodePackage(in) }(chInput, chError) } @@ -38,7 +40,7 @@ func DecodePool(chInput <-chan JsonDecodePackage, chError chan error, poolSize i // decodePackage is the worker function for DecodePool. Reads from a channel and sends back decoded // packages. Returns error if any. -func decodePackage(chInput <-chan JsonDecodePackage) error { +func decodePackage(chInput <-chan *JsonDecodePackage) error { var err error for input := range chInput{ @@ -53,7 +55,7 @@ func decodePackage(chInput <-chan JsonDecodePackage) error { } } - *input.ChBack <- JsonDecodePackage(input) + *input.ChBack <- input } From 3cd1eb7fdecb1851ebd1da45d8b362f160d371c2 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Tue, 5 Mar 2019 17:07:12 +0100 Subject: [PATCH 06/10] Use json tags --- json-decoder.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index 33b6bbc3..f4a2e2e4 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -1,31 +1,27 @@ package icingadb_json_decoder import ( + "git.icinga.com/icingadb/icingadb-main/configobject" "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary type JsonDecodePackage struct{ + Id [20]byte // Json strings from Redis ChecksumsRaw string ConfigRaw string - // When unmarshaled, results will be written here - ChecksumsProcessed map[string]interface{} - ConfigProcessed map[string]interface{} + Row configobject.Row // Package will be sent back through this channel - ChBack *chan *JsonDecodePackage + ChBack chan *JsonDecodePackage + Factory configobject.RowFactory } // decodeString unmarshals the string toDecode using the json package. Returns the object as a // map[string]interface and nil if successful, error if not. -func decodeString(toDecode string) (map[string]interface{}, error) { - var unJson interface{} = nil - if err := json.Unmarshal([]byte(toDecode), &unJson); err != nil { - return nil, err - } - - return unJson.(map[string]interface{}), nil +func decodeString(toDecode string, row configobject.Row) error { + return json.Unmarshal([]byte(toDecode), row) } // decodePool takes a channel it receives JsonDecodePackages from and an error channel to forward errors. @@ -44,19 +40,20 @@ func decodePackage(chInput <-chan *JsonDecodePackage) error { var err error for input := range chInput{ - if input.ChecksumsProcessed == nil && input.ChecksumsRaw != "" { - if input.ChecksumsProcessed, err = decodeString(input.ChecksumsRaw); err != nil { + row := input.Factory() + if input.ChecksumsRaw != "" { + if err := decodeString(input.ChecksumsRaw, row); err != nil { return err } } - if input.ConfigProcessed == nil && input.ConfigRaw != ""{ - if input.ConfigProcessed, err = decodeString(input.ConfigRaw); err != nil { + if input.ConfigRaw != ""{ + if err = decodeString(input.ConfigRaw, row); err != nil { return err } } - *input.ChBack <- input - + input.Row = row + input.ChBack <- input } return nil From 34224dacff17ea72b05fbf74491ae8e23f53721c Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Thu, 7 Mar 2019 17:21:29 +0100 Subject: [PATCH 07/10] Move Rows here This is done to prevent the circular dependency. I don't like this solution but it works for now. --- json-decoder.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index f4a2e2e4..c1f70d11 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -1,26 +1,35 @@ package icingadb_json_decoder import ( - "git.icinga.com/icingadb/icingadb-main/configobject" "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary +type Row interface { + InsertValues() []interface{} + UpdateValues() []interface{} + GetId() string + SetId(id string) +} + +type RowFactory func() Row + + type JsonDecodePackage struct{ Id [20]byte // Json strings from Redis ChecksumsRaw string ConfigRaw string - Row configobject.Row + Row Row // Package will be sent back through this channel ChBack chan *JsonDecodePackage - Factory configobject.RowFactory + Factory RowFactory } // decodeString unmarshals the string toDecode using the json package. Returns the object as a // map[string]interface and nil if successful, error if not. -func decodeString(toDecode string, row configobject.Row) error { +func decodeString(toDecode string, row Row) error { return json.Unmarshal([]byte(toDecode), row) } From 7b9b594f00b346cd149eeef79a383c35ffcb9a9b Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Fri, 8 Mar 2019 10:36:39 +0100 Subject: [PATCH 08/10] Move Row back into main/configobject package --- json-decoder.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index c1f70d11..f4a2e2e4 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -1,35 +1,26 @@ package icingadb_json_decoder import ( + "git.icinga.com/icingadb/icingadb-main/configobject" "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary -type Row interface { - InsertValues() []interface{} - UpdateValues() []interface{} - GetId() string - SetId(id string) -} - -type RowFactory func() Row - - type JsonDecodePackage struct{ Id [20]byte // Json strings from Redis ChecksumsRaw string ConfigRaw string - Row Row + Row configobject.Row // Package will be sent back through this channel ChBack chan *JsonDecodePackage - Factory RowFactory + Factory configobject.RowFactory } // decodeString unmarshals the string toDecode using the json package. Returns the object as a // map[string]interface and nil if successful, error if not. -func decodeString(toDecode string, row Row) error { +func decodeString(toDecode string, row configobject.Row) error { return json.Unmarshal([]byte(toDecode), row) } From 797ea7a1426426cb8ee266db7a6b2785ac189318 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Tue, 12 Mar 2019 12:47:28 +0100 Subject: [PATCH 09/10] Use chunks instead of single packages --- json-decoder.go | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/json-decoder.go b/json-decoder.go index f4a2e2e4..7b737df7 100644 --- a/json-decoder.go +++ b/json-decoder.go @@ -7,15 +7,24 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary -type JsonDecodePackage struct{ - Id [20]byte +type JsonDecodePackage struct { + // Id of the config object + Id string // Json strings from Redis ChecksumsRaw string + // Json strings from Redis ConfigRaw string + // Unmarshaled config object ready to be used in SQL Row configobject.Row // Package will be sent back through this channel - ChBack chan *JsonDecodePackage Factory configobject.RowFactory + // Object type (host, service, endpoint, command...) + ObjectType string +} + +type JsonDecodePackages struct { + Packages []JsonDecodePackage + ChBack chan<- []configobject.Row } // decodeString unmarshals the string toDecode using the json package. Returns the object as a @@ -26,9 +35,9 @@ func decodeString(toDecode string, row configobject.Row) error { // decodePool takes a channel it receives JsonDecodePackages from and an error channel to forward errors. // These packages are decoded by a pool of pollSize workers which send their result back through their own channel. -func DecodePool(chInput <-chan *JsonDecodePackage, chError chan error, poolSize int) { +func DecodePool(chInput <-chan *JsonDecodePackages, chError chan error, poolSize int) { for i := 0; i < poolSize; i++ { - go func(in <-chan *JsonDecodePackage, chErrorInternal chan error) { + go func(in <-chan *JsonDecodePackages, chErrorInternal chan error) { chErrorInternal <- decodePackage(in) }(chInput, chError) } @@ -36,24 +45,29 @@ func DecodePool(chInput <-chan *JsonDecodePackage, chError chan error, poolSize // decodePackage is the worker function for DecodePool. Reads from a channel and sends back decoded // packages. Returns error if any. -func decodePackage(chInput <-chan *JsonDecodePackage) error { +func decodePackage(chInput <-chan *JsonDecodePackages) error { var err error - for input := range chInput{ - row := input.Factory() - if input.ChecksumsRaw != "" { - if err := decodeString(input.ChecksumsRaw, row); err != nil { - return err + for pkgs := range chInput{ + var rows []configobject.Row + for _, pkg := range pkgs.Packages{ + row := pkg.Factory() + row.SetId(pkg.Id) + if pkg.ChecksumsRaw != "" { + if err := decodeString(pkg.ChecksumsRaw, row); err != nil { + return err + } } - } - if input.ConfigRaw != ""{ - if err = decodeString(input.ConfigRaw, row); err != nil { - return err + if pkg.ConfigRaw != ""{ + if err = decodeString(pkg.ConfigRaw, row); err != nil { + return err + } } + + rows = append(rows, row) } - input.Row = row - input.ChBack <- input + pkgs.ChBack <- rows } return nil From 31168fabbe03cef158362c910330396f9fc2de87 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Mon, 13 May 2019 14:41:59 +0200 Subject: [PATCH 10/10] Prepare repository merge --- json-decoder.go => json/json-decoder.go | 0 json-decoder_test.go => json/json-decoder_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename json-decoder.go => json/json-decoder.go (100%) rename json-decoder_test.go => json/json-decoder_test.go (100%) diff --git a/json-decoder.go b/json/json-decoder.go similarity index 100% rename from json-decoder.go rename to json/json-decoder.go diff --git a/json-decoder_test.go b/json/json-decoder_test.go similarity index 100% rename from json-decoder_test.go rename to json/json-decoder_test.go