From 02a36ad8b594f00798cbde30ceacd0f58e6bbf95 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 2 Dec 2021 14:37:40 +0100 Subject: [PATCH] Integration tests: test that both icinga2 instances write identical history streams refs https://github.com/Icinga/icinga2/issues/9101 --- tests/go.mod | 2 +- tests/go.sum | 4 ++ tests/history_test.go | 136 +++++++++++++++++++++++++++++++++++++----- 3 files changed, 126 insertions(+), 16 deletions(-) diff --git a/tests/go.mod b/tests/go.mod index ef7025db..8a6b62ba 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/containerd/containerd v1.5.6 // indirect github.com/go-redis/redis/v8 v8.11.3 github.com/google/uuid v1.3.0 - github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca + github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d github.com/jmoiron/sqlx v1.3.4 github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.19.1 diff --git a/tests/go.sum b/tests/go.sum index e5f4656c..32a1228b 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -392,6 +392,10 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca h1:8EXQTKEqUkmF/9/9iIQNPKT0BF2AY9/zunbpcRx+fcI= github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo= +github.com/icinga/icinga-testing v0.0.0-20211202132752-03a8b5369d7a h1:FoaQd9W/hmnJ5V63dbKE6M8WBmkyhptR16Xp5WXzLko= +github.com/icinga/icinga-testing v0.0.0-20211202132752-03a8b5369d7a/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo= +github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d h1:NDqQPFq81quN4fYRmpK53wz1Jx1Pik7IBT0KoxMMsag= +github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= diff --git a/tests/history_test.go b/tests/history_test.go index 23794032..bae49c88 100644 --- a/tests/history_test.go +++ b/tests/history_test.go @@ -19,6 +19,7 @@ import ( "math" "net/http" "sort" + "strconv" "testing" "text/template" "time" @@ -49,8 +50,15 @@ func testHistory(t *testing.T, numNodes int) { Name string Icinga2 services.Icinga2 IcingaClient *utils.Icinga2Client - Redis services.RedisServer - RedisClient *redis.Client + + // Redis server and client for the instance used by the Icinga DB process. + Redis services.RedisServer + RedisClient *redis.Client + + // Second Redis server and client to verify the consistency of the history streams in a HA setup. + // There is no Icinga DB process reading from this Redis so history events are not removed there. + ConsistencyRedis services.RedisServer + ConsistencyRedisClient *redis.Client } nodes := make([]*Node, numNodes) @@ -58,14 +66,17 @@ func testHistory(t *testing.T, numNodes int) { for i := range nodes { name := fmt.Sprintf("master-%d", i) redisServer := it.RedisServerT(t) + consistencyRedisServer := it.RedisServerT(t) icinga := it.Icinga2NodeT(t, name) nodes[i] = &Node{ - Name: name, - Icinga2: icinga, - IcingaClient: icinga.ApiClient(), - Redis: redisServer, - RedisClient: redisServer.Open(), + Name: name, + Icinga2: icinga, + IcingaClient: icinga.ApiClient(), + Redis: redisServer, + RedisClient: redisServer.Open(), + ConsistencyRedis: consistencyRedisServer, + ConsistencyRedisClient: consistencyRedisServer.Open(), } } @@ -87,13 +98,29 @@ func testHistory(t *testing.T, numNodes int) { n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".crt", cert.CertificateToPem()) n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".key", cert.KeyToPem()) n.Icinga2.EnableIcingaDb(n.Redis) + n.Icinga2.EnableIcingaDb(n.ConsistencyRedis) err = n.Icinga2.Reload() require.NoError(t, err, "icinga2 should reload without error") it.IcingaDbInstanceT(t, n.Redis, m) { n := n - t.Cleanup(func() { _ = n.RedisClient.Close() }) + t.Cleanup(func() { + _ = n.RedisClient.Close() + _ = n.ConsistencyRedisClient.Close() + }) + } + } + + testConsistency := func(t *testing.T, stream string) { + if numNodes > 1 { + t.Run("Consistency", func(t *testing.T) { + var clients []*redis.Client + for _, node := range nodes { + clients = append(clients, node.ConsistencyRedisClient) + } + assertStreamConsistency(t, clients, stream) + }) } } @@ -125,6 +152,8 @@ func testHistory(t *testing.T, numNodes int) { t.Run("Acknowledgement", func(t *testing.T) { t.Parallel() + const stream = "icinga:history:stream:acknowledgement" + hostname := utils.UniqueName(t, "host") client.CreateHost(t, hostname, map[string]interface{}{ "attrs": map[string]interface{}{ @@ -158,8 +187,7 @@ func testHistory(t *testing.T, numNodes int) { require.Equal(t, http.StatusOK, ackResponse.Results[0].Code, "acknowledge-problem result should have OK status") for _, n := range nodes { - assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:acknowledgement") - + assertEventuallyDrained(t, n.RedisClient, stream) } eventually.Assert(t, func(t require.TestingT) { @@ -180,11 +208,15 @@ func testHistory(t *testing.T, numNodes int) { assert.Equal(t, author, rows[0].Author, "acknowledgement author should match") assert.Equal(t, comment, rows[0].Comment, "acknowledgement comment should match") }, 5*time.Second, 200*time.Millisecond) + + testConsistency(t, stream) }) t.Run("Comment", func(t *testing.T) { t.Parallel() + const stream = "icinga:history:stream:comment" + hostname := utils.UniqueName(t, "host") client.CreateHost(t, hostname, map[string]interface{}{ "attrs": map[string]interface{}{ @@ -214,7 +246,7 @@ func testHistory(t *testing.T, numNodes int) { require.Equal(t, http.StatusOK, addResponse.Results[0].Code, "add-comment result should have OK status") for _, n := range nodes { - assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:comment") + assertEventuallyDrained(t, n.RedisClient, stream) } eventually.Assert(t, func(t require.TestingT) { @@ -234,11 +266,15 @@ func testHistory(t *testing.T, numNodes int) { assert.Equal(t, author, rows[0].Author, "author should match") assert.Equal(t, comment, rows[0].Comment, "comment text should match") }, 5*time.Second, 200*time.Millisecond) + + testConsistency(t, stream) }) t.Run("Downtime", func(t *testing.T) { t.Parallel() + const stream = "icinga:history:stream:downtime" + hostname := utils.UniqueName(t, "host") client.CreateHost(t, hostname, map[string]interface{}{ "attrs": map[string]interface{}{ @@ -284,7 +320,7 @@ func testHistory(t *testing.T, numNodes int) { downtimeEnd := time.Now() for _, n := range nodes { - assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:downtime") + assertEventuallyDrained(t, n.RedisClient, stream) } eventually.Assert(t, func(t require.TestingT) { @@ -301,11 +337,15 @@ func testHistory(t *testing.T, numNodes int) { require.Equal(t, []string{"downtime_start", "downtime_end"}, rows, "downtime history should match expected result") }, 5*time.Second, 200*time.Millisecond) + + testConsistency(t, stream) }) t.Run("Flapping", func(t *testing.T) { t.Parallel() + const stream = "icinga:history:stream:flapping" + hostname := utils.UniqueName(t, "host") client.CreateHost(t, hostname, map[string]interface{}{ "attrs": map[string]interface{}{ @@ -327,7 +367,7 @@ func testHistory(t *testing.T, numNodes int) { timeAfter := time.Now() for _, n := range nodes { - assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:flapping") + assertEventuallyDrained(t, n.RedisClient, stream) } eventually.Assert(t, func(t require.TestingT) { @@ -344,11 +384,15 @@ func testHistory(t *testing.T, numNodes int) { require.Equal(t, []string{"flapping_start", "flapping_end"}, rows, "flapping history should match expected result") }, 5*time.Second, 200*time.Millisecond) + + testConsistency(t, stream) }) t.Run("Notification", func(t *testing.T) { t.Parallel() + const stream = "icinga:history:stream:notification" + hostname := utils.UniqueName(t, "host") client.CreateHost(t, hostname, map[string]interface{}{ "attrs": map[string]interface{}{ @@ -403,7 +447,7 @@ func testHistory(t *testing.T, numNodes int) { timeAfter := time.Now() for _, n := range nodes { - assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:notification") + assertEventuallyDrained(t, n.RedisClient, stream) } eventually.Assert(t, func(t require.TestingT) { @@ -420,11 +464,15 @@ func testHistory(t *testing.T, numNodes int) { require.Equal(t, expected, rows, "notification history should match expected result") }, 5*time.Second, 200*time.Millisecond) + + testConsistency(t, stream) }) t.Run("State", func(t *testing.T) { t.Parallel() + const stream = "icinga:history:stream:state" + hostname := utils.UniqueName(t, "host") client.CreateHost(t, hostname, map[string]interface{}{ "attrs": map[string]interface{}{ @@ -467,7 +515,7 @@ func testHistory(t *testing.T, numNodes int) { timeAfter := time.Now() for _, n := range nodes { - assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:state") + assertEventuallyDrained(t, n.RedisClient, stream) } eventually.Assert(t, func(t require.TestingT) { @@ -482,6 +530,8 @@ func testHistory(t *testing.T, numNodes int) { require.Equal(t, expected, rows, "state history does not match expected result") }, 5*time.Second, 200*time.Millisecond) + + testConsistency(t, stream) }) } @@ -493,6 +543,62 @@ func assertEventuallyDrained(t testing.TB, redis *redis.Client, stream string) { }, 5*time.Second, 10*time.Millisecond) } +func assertStreamConsistency(t testing.TB, clients []*redis.Client, stream string) { + messages := make([][]map[string]interface{}, len(clients)) + + for i, c := range clients { + xmessages, err := c.XRange(context.Background(), stream, "-", "+").Result() + require.NoError(t, err, "reading %s should not fail", stream) + assert.NotEmpty(t, xmessages, "%s should not be empty on the Redis server %d", stream, i) + + // Convert []XMessage into a slice of just the values. The IDs are generated by the Redis server based + // on the time the entry was written to the stream, so these IDs are expected to differ. + ms := make([]map[string]interface{}, 0, len(xmessages)) + for _, xmessage := range xmessages { + values := xmessage.Values + + // Delete endpoint_id as this is supposed to differ between both history streams as each endpoint + // writes its own ID into the stream. + delete(values, "endpoint_id") + + // users_notified_ids represents a set, so order does not matter, sort for the comparison later. + if idsJson, ok := values["users_notified_ids"]; ok { + var ids []string + err := json.Unmarshal([]byte(idsJson.(string)), &ids) + require.NoError(t, err, "if users_notified_ids is present, it must be a JSON list of strings") + sort.Strings(ids) + values["users_notified_ids"] = ids + } + + ms = append(ms, values) + } + sort.Slice(ms, func(i, j int) bool { + eventTime := func(v map[string]interface{}) uint64 { + s, ok := v["event_time"].(string) + if !ok { + return 0 + } + u, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0 + } + return u + } + + sortKey := func(v map[string]interface{}) string { + return fmt.Sprintf("%020d-%s", eventTime(v), v["event_id"]) + } + + return sortKey(ms[i]) < sortKey(ms[j]) + }) + messages[i] = ms + } + + for i := 0; i < len(messages)-1; i++ { + assert.Equal(t, messages[i], messages[i+1], "%s should be equal on both Redis servers", stream) + } +} + func processCheckResult(t *testing.T, client *utils.Icinga2Client, hostname string, status int) time.Time { // Ensure that check results have distinct timestamps in millisecond resolution. time.Sleep(10 * time.Millisecond)