From 29301e4d9ea297abeb2698e5197063e96c77ef2a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 2 Dec 2019 18:24:06 +0100 Subject: [PATCH] XREAD from icinga:stats --- connection/redis.go | 22 ++++++++++++++++++++++ ha/heartbeat.go | 44 ++++++++++++++++++++++++++------------------ ha/heartbeat_test.go | 22 +++++++++++++++------- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/connection/redis.go b/connection/redis.go index b9457524..759a88c1 100644 --- a/connection/redis.go +++ b/connection/redis.go @@ -75,6 +75,7 @@ type RedisClient interface { Publish(channel string, message interface{}) *redis.IntCmd XRead(a *redis.XReadArgs) *redis.XStreamSliceCmd XDel(stream string, ids ...string) *redis.IntCmd + XAdd(a *redis.XAddArgs) *redis.StringCmd HKeys(key string) *redis.StringSliceCmd HMGet(key string, fields ...string) *redis.SliceCmd HGetAll(key string) *redis.StringStringMapCmd @@ -266,6 +267,27 @@ func (rdbw *RDBWrapper) XDel(stream string, ids ...string) *redis.IntCmd { } } +// XAdd is a wrapper for connection handling. +func (rdbw *RDBWrapper) XAdd(a *redis.XAddArgs) *redis.StringCmd { + for { + if !rdbw.IsConnected() { + rdbw.WaitForConnection() + continue + } + + cmd := rdbw.Rdb.XAdd(a) + _, err := cmd.Result() + + if err != nil { + if !rdbw.CheckConnection(false) { + continue + } + } + + return cmd + } +} + // HKeys is a wrapper for connection handling. func (rdbw *RDBWrapper) HKeys(key string) *redis.StringSliceCmd { for { diff --git a/ha/heartbeat.go b/ha/heartbeat.go index 73bc56cb..e11195ff 100644 --- a/ha/heartbeat.go +++ b/ha/heartbeat.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "encoding/json" "github.com/Icinga/icingadb/connection" + "github.com/go-redis/redis" log "github.com/sirupsen/logrus" ) @@ -25,31 +26,38 @@ func Sha1bytes(bytes []byte) []byte { func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment, chErr chan error) { log.Info("Starting heartbeat listener") - subscription := rdb.Subscribe() - defer subscription.Close() - if err := subscription.Subscribe("icinga:stats"); err != nil { - chErr <- err - return + xReadArgs := redis.XReadArgs{ + Streams: []string{"icinga:stats", "0-0"}, + Count: 1, + Block: 0, } for { - msg, err := subscription.ReceiveMessage() - if err != nil { - chErr <- err + streams, errXR := rdb.XRead(&xReadArgs).Result() + if errXR != nil { + chErr <- errXR return } - log.Debug("Got heartbeat") + for _, stream := range streams { + for _, message := range stream.Messages { + log.Debug("Got heartbeat") - var unJson interface{} = nil - if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil { - chErr <- err - return + xReadArgs.Streams[1] = message.ID + + if appJson, ok := message.Values["IcingaApplication"].(string); ok { + var unJson interface{} = nil + if errJU := json.Unmarshal([]byte(appJson), &unJson); errJU != nil { + chErr <- errJU + return + } + + environment := unJson.(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) + nodeName := unJson.(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["node_name"].(string) + env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), NodeName: nodeName} + chEnv <- env + } + } } - - environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) - nodeName := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["node_name"].(string) - env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), NodeName: nodeName} - chEnv <- env } } diff --git a/ha/heartbeat_test.go b/ha/heartbeat_test.go index 20334ba1..c388d9fe 100644 --- a/ha/heartbeat_test.go +++ b/ha/heartbeat_test.go @@ -6,19 +6,21 @@ import ( "encoding/json" "github.com/Icinga/icingadb/config/testbackends" "github.com/Icinga/icingadb/connection" + "github.com/go-redis/redis" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "testing" "time" ) -var icingastate = "{\"IcingaApplication\":" + - "{\"status\": " + +const app = "{\"status\": " + "{\"icingaapplication\":" + "{\"app\":{" + "\"environment\": \"\"," + "\"node_name\": \"master1.icinga.test.com\"" + - "}}}}, \"config_dump_in_progress\": false}" + "}}}}" + +const dump = "false" func TestIcingaHeartbeatListener(t *testing.T) { rdb := connection.NewRDBWrapper(testbackends.RedisTestAddr, 64) @@ -35,11 +37,17 @@ func TestIcingaHeartbeatListener(t *testing.T) { time.Sleep(time.Second * 2) var uj interface{} = nil - if err := json.Unmarshal([]byte(icingastate), &uj); err != nil { - assert.Nil(t, err) - } + assert.Nil(t, json.Unmarshal([]byte(app), &uj)) + assert.Nil(t, json.Unmarshal([]byte(dump), &uj)) - rdb.Rdb.Publish("icinga:stats", icingastate) + rdb.Rdb.XAdd(&redis.XAddArgs{ + Stream: "icinga:stats", + ID: "*", + Values: map[string]interface{}{ + "IcingaApplication": app, + "config_dump_in_progress": dump, + }, + }) env := <-chEnv