Merge pull request #77 from Icinga/feature/stats-stream

XREAD from icinga:stats
This commit is contained in:
Noah Hilverling 2019-12-03 12:53:55 +01:00 committed by GitHub
commit 65521b4446
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 25 deletions

View file

@ -75,6 +75,7 @@ type RedisClient interface {
Publish(channel string, message interface{}) *redis.IntCmd
XRead(a *redis.XReadArgs) *redis.XStreamSliceCmd
XDel(stream string, ids ...string) *redis.IntCmd
XAdd(a *redis.XAddArgs) *redis.StringCmd
HKeys(key string) *redis.StringSliceCmd
HMGet(key string, fields ...string) *redis.SliceCmd
HGetAll(key string) *redis.StringStringMapCmd
@ -266,6 +267,27 @@ func (rdbw *RDBWrapper) XDel(stream string, ids ...string) *redis.IntCmd {
}
}
// XAdd is a wrapper for connection handling.
func (rdbw *RDBWrapper) XAdd(a *redis.XAddArgs) *redis.StringCmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.XAdd(a)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// HKeys is a wrapper for connection handling.
func (rdbw *RDBWrapper) HKeys(key string) *redis.StringSliceCmd {
for {

View file

@ -6,6 +6,7 @@ import (
"crypto/sha1"
"encoding/json"
"github.com/Icinga/icingadb/connection"
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
)
@ -25,31 +26,38 @@ func Sha1bytes(bytes []byte) []byte {
func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment, chErr chan error) {
log.Info("Starting heartbeat listener")
subscription := rdb.Subscribe()
defer subscription.Close()
if err := subscription.Subscribe("icinga:stats"); err != nil {
chErr <- err
return
xReadArgs := redis.XReadArgs{
Streams: []string{"icinga:stats", "0-0"},
Count: 1,
Block: 0,
}
for {
msg, err := subscription.ReceiveMessage()
if err != nil {
chErr <- err
streams, errXR := rdb.XRead(&xReadArgs).Result()
if errXR != nil {
chErr <- errXR
return
}
log.Debug("Got heartbeat")
for _, stream := range streams {
for _, message := range stream.Messages {
log.Debug("Got heartbeat")
var unJson interface{} = nil
if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil {
chErr <- err
return
xReadArgs.Streams[1] = message.ID
if appJson, ok := message.Values["IcingaApplication"].(string); ok {
var unJson interface{} = nil
if errJU := json.Unmarshal([]byte(appJson), &unJson); errJU != nil {
chErr <- errJU
return
}
environment := unJson.(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string)
nodeName := unJson.(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["node_name"].(string)
env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), NodeName: nodeName}
chEnv <- env
}
}
}
environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string)
nodeName := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["node_name"].(string)
env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), NodeName: nodeName}
chEnv <- env
}
}

View file

@ -6,19 +6,21 @@ import (
"encoding/json"
"github.com/Icinga/icingadb/config/testbackends"
"github.com/Icinga/icingadb/connection"
"github.com/go-redis/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
var icingastate = "{\"IcingaApplication\":" +
"{\"status\": " +
const app = "{\"status\": " +
"{\"icingaapplication\":" +
"{\"app\":{" +
"\"environment\": \"\"," +
"\"node_name\": \"master1.icinga.test.com\"" +
"}}}}, \"config_dump_in_progress\": false}"
"}}}}"
const dump = "false"
func TestIcingaHeartbeatListener(t *testing.T) {
rdb := connection.NewRDBWrapper(testbackends.RedisTestAddr, 64)
@ -35,11 +37,17 @@ func TestIcingaHeartbeatListener(t *testing.T) {
time.Sleep(time.Second * 2)
var uj interface{} = nil
if err := json.Unmarshal([]byte(icingastate), &uj); err != nil {
assert.Nil(t, err)
}
assert.Nil(t, json.Unmarshal([]byte(app), &uj))
assert.Nil(t, json.Unmarshal([]byte(dump), &uj))
rdb.Rdb.Publish("icinga:stats", icingastate)
rdb.Rdb.XAdd(&redis.XAddArgs{
Stream: "icinga:stats",
ID: "*",
Values: map[string]interface{}{
"IcingaApplication": app,
"config_dump_in_progress": dump,
},
})
env := <-chEnv