Integration tests: test that both icinga2 instances write identical history streams

refs https://github.com/Icinga/icinga2/issues/9101
This commit is contained in:
Julian Brost 2021-12-02 14:37:40 +01:00
parent 4705619329
commit 02a36ad8b5
3 changed files with 126 additions and 16 deletions

View file

@ -6,7 +6,7 @@ require (
github.com/containerd/containerd v1.5.6 // indirect
github.com/go-redis/redis/v8 v8.11.3
github.com/google/uuid v1.3.0
github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca
github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d
github.com/jmoiron/sqlx v1.3.4
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.1

View file

@ -392,6 +392,10 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca h1:8EXQTKEqUkmF/9/9iIQNPKT0BF2AY9/zunbpcRx+fcI=
github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo=
github.com/icinga/icinga-testing v0.0.0-20211202132752-03a8b5369d7a h1:FoaQd9W/hmnJ5V63dbKE6M8WBmkyhptR16Xp5WXzLko=
github.com/icinga/icinga-testing v0.0.0-20211202132752-03a8b5369d7a/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo=
github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d h1:NDqQPFq81quN4fYRmpK53wz1Jx1Pik7IBT0KoxMMsag=
github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=

View file

@ -19,6 +19,7 @@ import (
"math"
"net/http"
"sort"
"strconv"
"testing"
"text/template"
"time"
@ -49,8 +50,15 @@ func testHistory(t *testing.T, numNodes int) {
Name string
Icinga2 services.Icinga2
IcingaClient *utils.Icinga2Client
Redis services.RedisServer
RedisClient *redis.Client
// Redis server and client for the instance used by the Icinga DB process.
Redis services.RedisServer
RedisClient *redis.Client
// Second Redis server and client to verify the consistency of the history streams in a HA setup.
// There is no Icinga DB process reading from this Redis so history events are not removed there.
ConsistencyRedis services.RedisServer
ConsistencyRedisClient *redis.Client
}
nodes := make([]*Node, numNodes)
@ -58,14 +66,17 @@ func testHistory(t *testing.T, numNodes int) {
for i := range nodes {
name := fmt.Sprintf("master-%d", i)
redisServer := it.RedisServerT(t)
consistencyRedisServer := it.RedisServerT(t)
icinga := it.Icinga2NodeT(t, name)
nodes[i] = &Node{
Name: name,
Icinga2: icinga,
IcingaClient: icinga.ApiClient(),
Redis: redisServer,
RedisClient: redisServer.Open(),
Name: name,
Icinga2: icinga,
IcingaClient: icinga.ApiClient(),
Redis: redisServer,
RedisClient: redisServer.Open(),
ConsistencyRedis: consistencyRedisServer,
ConsistencyRedisClient: consistencyRedisServer.Open(),
}
}
@ -87,13 +98,29 @@ func testHistory(t *testing.T, numNodes int) {
n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".crt", cert.CertificateToPem())
n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".key", cert.KeyToPem())
n.Icinga2.EnableIcingaDb(n.Redis)
n.Icinga2.EnableIcingaDb(n.ConsistencyRedis)
err = n.Icinga2.Reload()
require.NoError(t, err, "icinga2 should reload without error")
it.IcingaDbInstanceT(t, n.Redis, m)
{
n := n
t.Cleanup(func() { _ = n.RedisClient.Close() })
t.Cleanup(func() {
_ = n.RedisClient.Close()
_ = n.ConsistencyRedisClient.Close()
})
}
}
testConsistency := func(t *testing.T, stream string) {
if numNodes > 1 {
t.Run("Consistency", func(t *testing.T) {
var clients []*redis.Client
for _, node := range nodes {
clients = append(clients, node.ConsistencyRedisClient)
}
assertStreamConsistency(t, clients, stream)
})
}
}
@ -125,6 +152,8 @@ func testHistory(t *testing.T, numNodes int) {
t.Run("Acknowledgement", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:acknowledgement"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -158,8 +187,7 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, http.StatusOK, ackResponse.Results[0].Code, "acknowledge-problem result should have OK status")
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:acknowledgement")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -180,11 +208,15 @@ func testHistory(t *testing.T, numNodes int) {
assert.Equal(t, author, rows[0].Author, "acknowledgement author should match")
assert.Equal(t, comment, rows[0].Comment, "acknowledgement comment should match")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("Comment", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:comment"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -214,7 +246,7 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, http.StatusOK, addResponse.Results[0].Code, "add-comment result should have OK status")
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:comment")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -234,11 +266,15 @@ func testHistory(t *testing.T, numNodes int) {
assert.Equal(t, author, rows[0].Author, "author should match")
assert.Equal(t, comment, rows[0].Comment, "comment text should match")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("Downtime", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:downtime"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -284,7 +320,7 @@ func testHistory(t *testing.T, numNodes int) {
downtimeEnd := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:downtime")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -301,11 +337,15 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, []string{"downtime_start", "downtime_end"}, rows,
"downtime history should match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("Flapping", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:flapping"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -327,7 +367,7 @@ func testHistory(t *testing.T, numNodes int) {
timeAfter := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:flapping")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -344,11 +384,15 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, []string{"flapping_start", "flapping_end"}, rows,
"flapping history should match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("Notification", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:notification"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -403,7 +447,7 @@ func testHistory(t *testing.T, numNodes int) {
timeAfter := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:notification")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -420,11 +464,15 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, expected, rows, "notification history should match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("State", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:state"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -467,7 +515,7 @@ func testHistory(t *testing.T, numNodes int) {
timeAfter := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:state")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -482,6 +530,8 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, expected, rows, "state history does not match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
}
@ -493,6 +543,62 @@ func assertEventuallyDrained(t testing.TB, redis *redis.Client, stream string) {
}, 5*time.Second, 10*time.Millisecond)
}
func assertStreamConsistency(t testing.TB, clients []*redis.Client, stream string) {
messages := make([][]map[string]interface{}, len(clients))
for i, c := range clients {
xmessages, err := c.XRange(context.Background(), stream, "-", "+").Result()
require.NoError(t, err, "reading %s should not fail", stream)
assert.NotEmpty(t, xmessages, "%s should not be empty on the Redis server %d", stream, i)
// Convert []XMessage into a slice of just the values. The IDs are generated by the Redis server based
// on the time the entry was written to the stream, so these IDs are expected to differ.
ms := make([]map[string]interface{}, 0, len(xmessages))
for _, xmessage := range xmessages {
values := xmessage.Values
// Delete endpoint_id as this is supposed to differ between both history streams as each endpoint
// writes its own ID into the stream.
delete(values, "endpoint_id")
// users_notified_ids represents a set, so order does not matter, sort for the comparison later.
if idsJson, ok := values["users_notified_ids"]; ok {
var ids []string
err := json.Unmarshal([]byte(idsJson.(string)), &ids)
require.NoError(t, err, "if users_notified_ids is present, it must be a JSON list of strings")
sort.Strings(ids)
values["users_notified_ids"] = ids
}
ms = append(ms, values)
}
sort.Slice(ms, func(i, j int) bool {
eventTime := func(v map[string]interface{}) uint64 {
s, ok := v["event_time"].(string)
if !ok {
return 0
}
u, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0
}
return u
}
sortKey := func(v map[string]interface{}) string {
return fmt.Sprintf("%020d-%s", eventTime(v), v["event_id"])
}
return sortKey(ms[i]) < sortKey(ms[j])
})
messages[i] = ms
}
for i := 0; i < len(messages)-1; i++ {
assert.Equal(t, messages[i], messages[i+1], "%s should be equal on both Redis servers", stream)
}
}
func processCheckResult(t *testing.T, client *utils.Icinga2Client, hostname string, status int) time.Time {
// Ensure that check results have distinct timestamps in millisecond resolution.
time.Sleep(10 * time.Millisecond)