Merge pull request #422 from Icinga/feature/test-ha-history-consistency

Integration tests: test that both icinga2 instances write identical history streams
This commit is contained in:
Julian Brost 2022-01-21 12:03:55 +01:00 committed by GitHub
commit 68a9f7faba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 267 additions and 36 deletions

View file

@ -6,7 +6,7 @@ require (
github.com/containerd/containerd v1.5.6 // indirect
github.com/go-redis/redis/v8 v8.11.3
github.com/google/uuid v1.3.0
github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca
github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d
github.com/jmoiron/sqlx v1.3.4
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.1

View file

@ -392,6 +392,10 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca h1:8EXQTKEqUkmF/9/9iIQNPKT0BF2AY9/zunbpcRx+fcI=
github.com/icinga/icinga-testing v0.0.0-20211112112017-64c69fdac3ca/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo=
github.com/icinga/icinga-testing v0.0.0-20211202132752-03a8b5369d7a h1:FoaQd9W/hmnJ5V63dbKE6M8WBmkyhptR16Xp5WXzLko=
github.com/icinga/icinga-testing v0.0.0-20211202132752-03a8b5369d7a/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo=
github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d h1:NDqQPFq81quN4fYRmpK53wz1Jx1Pik7IBT0KoxMMsag=
github.com/icinga/icinga-testing v0.0.0-20211203084126-748428acf86d/go.mod h1:VzB7xVUPFvAUgX1nDrheKHpQddvUIN24Sei4mLGelpo=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=

View file

@ -19,6 +19,7 @@ import (
"math"
"net/http"
"sort"
"strconv"
"testing"
"text/template"
"time"
@ -49,8 +50,15 @@ func testHistory(t *testing.T, numNodes int) {
Name string
Icinga2 services.Icinga2
IcingaClient *utils.Icinga2Client
Redis services.RedisServer
RedisClient *redis.Client
// Redis server and client for the instance used by the Icinga DB process.
Redis services.RedisServer
RedisClient *redis.Client
// Second Redis server and client to verify the consistency of the history streams in a HA setup.
// There is no Icinga DB process reading from this Redis so history events are not removed there.
ConsistencyRedis services.RedisServer
ConsistencyRedisClient *redis.Client
}
nodes := make([]*Node, numNodes)
@ -58,14 +66,17 @@ func testHistory(t *testing.T, numNodes int) {
for i := range nodes {
name := fmt.Sprintf("master-%d", i)
redisServer := it.RedisServerT(t)
consistencyRedisServer := it.RedisServerT(t)
icinga := it.Icinga2NodeT(t, name)
nodes[i] = &Node{
Name: name,
Icinga2: icinga,
IcingaClient: icinga.ApiClient(),
Redis: redisServer,
RedisClient: redisServer.Open(),
Name: name,
Icinga2: icinga,
IcingaClient: icinga.ApiClient(),
Redis: redisServer,
RedisClient: redisServer.Open(),
ConsistencyRedis: consistencyRedisServer,
ConsistencyRedisClient: consistencyRedisServer.Open(),
}
}
@ -87,13 +98,29 @@ func testHistory(t *testing.T, numNodes int) {
n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".crt", cert.CertificateToPem())
n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".key", cert.KeyToPem())
n.Icinga2.EnableIcingaDb(n.Redis)
n.Icinga2.EnableIcingaDb(n.ConsistencyRedis)
err = n.Icinga2.Reload()
require.NoError(t, err, "icinga2 should reload without error")
it.IcingaDbInstanceT(t, n.Redis, m)
{
n := n
t.Cleanup(func() { _ = n.RedisClient.Close() })
t.Cleanup(func() {
_ = n.RedisClient.Close()
_ = n.ConsistencyRedisClient.Close()
})
}
}
testConsistency := func(t *testing.T, stream string) {
if numNodes > 1 {
t.Run("Consistency", func(t *testing.T) {
var clients []*redis.Client
for _, node := range nodes {
clients = append(clients, node.ConsistencyRedisClient)
}
assertStreamConsistency(t, clients, stream)
})
}
}
@ -125,6 +152,8 @@ func testHistory(t *testing.T, numNodes int) {
t.Run("Acknowledgement", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:acknowledgement"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -158,8 +187,7 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, http.StatusOK, ackResponse.Results[0].Code, "acknowledge-problem result should have OK status")
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:acknowledgement")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -180,11 +208,22 @@ func testHistory(t *testing.T, numNodes int) {
assert.Equal(t, author, rows[0].Author, "acknowledgement author should match")
assert.Equal(t, comment, rows[0].Comment, "acknowledgement comment should match")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("Comment", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:comment"
type HistoryEvent struct {
Type string `db:"event_type"`
Author string `db:"author"`
Comment string `db:"comment"`
RemovedBy *string `db:"removed_by"`
}
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -212,33 +251,89 @@ func testHistory(t *testing.T, numNodes int) {
require.NoError(t, err, "decode add-comment response")
require.Equal(t, 1, len(addResponse.Results), "add-comment should return 1 result")
require.Equal(t, http.StatusOK, addResponse.Results[0].Code, "add-comment result should have OK status")
commentName := addResponse.Results[0].Name
// Ensure that downtime events have distinct timestamps in millisecond resolution.
time.Sleep(10 * time.Millisecond)
removedBy := utils.RandomString(8)
req, err = json.Marshal(ActionsRemoveCommentRequest{
Comment: commentName,
Author: removedBy,
})
require.NoError(t, err, "marshal remove-comment request")
response, err = client.PostJson("/v1/actions/remove-comment", bytes.NewBuffer(req))
require.NoError(t, err, "remove-comment")
require.Equal(t, 200, response.StatusCode, "remove-comment")
expected := []HistoryEvent{
{Type: "comment_add", Author: author, Comment: comment, RemovedBy: &removedBy},
{Type: "comment_remove", Author: author, Comment: comment, RemovedBy: &removedBy},
}
if !testing.Short() {
// Ensure that downtime events have distinct timestamps in millisecond resolution.
time.Sleep(10 * time.Millisecond)
expireAuthor := utils.RandomString(8)
expireComment := utils.RandomString(8)
expireDelay := time.Second
req, err = json.Marshal(ActionsAddCommentRequest{
Type: "Host",
Filter: fmt.Sprintf(`host.name==%q`, hostname),
Author: expireAuthor,
Comment: expireComment,
Expiry: float64(time.Now().Add(expireDelay).UnixMilli()) / 1000,
})
require.NoError(t, err, "marshal request")
response, err = client.PostJson("/v1/actions/add-comment", bytes.NewBuffer(req))
require.NoError(t, err, "add-comment")
require.Equal(t, 200, response.StatusCode, "add-comment")
// Icinga only expires comments every 60 seconds, so wait this long after the expiry time.
time.Sleep(expireDelay + 60*time.Second)
expected = append(expected,
HistoryEvent{Type: "comment_add", Author: expireAuthor, Comment: expireComment},
HistoryEvent{Type: "comment_remove", Author: expireAuthor, Comment: expireComment},
)
}
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:comment")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
type Row struct {
Author string `db:"author"`
Comment string `db:"comment"`
}
var rows []Row
err = db.Select(&rows, "SELECT c.author, c.comment"+
var rows []HistoryEvent
err = db.Select(&rows, "SELECT h.event_type, c.author, c.comment, c.removed_by"+
" FROM history h"+
" JOIN comment_history c ON c.comment_id = h.comment_history_id"+
" JOIN host ON host.id = c.host_id WHERE host.name = ?", hostname)
" JOIN host ON host.id = c.host_id WHERE host.name = ?"+
" ORDER BY h.event_time", hostname)
require.NoError(t, err, "select comment_history")
require.Equal(t, 1, len(rows), "there should be exactly one comment_history row")
assert.Equal(t, author, rows[0].Author, "author should match")
assert.Equal(t, comment, rows[0].Comment, "comment text should match")
assert.Equal(t, expected, rows, "comment history should match")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
if testing.Short() {
t.Skip("skipped comment expiry test")
}
})
t.Run("Downtime", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:downtime"
type HistoryEvent struct {
Event string `db:"event_type"`
Author string `db:"author"`
Comment string `db:"comment"`
Cancelled string `db:"has_been_cancelled"`
}
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -250,14 +345,16 @@ func testHistory(t *testing.T, numNodes int) {
})
downtimeStart := time.Now()
author := utils.RandomString(8)
comment := utils.RandomString(8)
req, err := json.Marshal(ActionsScheduleDowntimeRequest{
Type: "Host",
Filter: fmt.Sprintf(`host.name==%q`, hostname),
StartTime: downtimeStart.Unix(),
EndTime: downtimeStart.Add(time.Hour).Unix(),
Fixed: true,
Author: utils.RandomString(8),
Comment: utils.RandomString(8),
Author: author,
Comment: comment,
})
require.NoError(t, err, "marshal request")
response, err := client.PostJson("/v1/actions/schedule-downtime", bytes.NewBuffer(req))
@ -276,6 +373,7 @@ func testHistory(t *testing.T, numNodes int) {
req, err = json.Marshal(ActionsRemoveDowntimeRequest{
Downtime: downtimeName,
Author: utils.RandomString(8),
})
require.NoError(t, err, "marshal remove-downtime request")
response, err = client.PostJson("/v1/actions/remove-downtime", bytes.NewBuffer(req))
@ -283,13 +381,56 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, 200, response.StatusCode, "remove-downtime")
downtimeEnd := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:downtime")
expected := []HistoryEvent{
{Event: "downtime_start", Author: author, Comment: comment, Cancelled: "y"},
{Event: "downtime_end", Author: author, Comment: comment, Cancelled: "y"},
}
eventually.Assert(t, func(t require.TestingT) {
var rows []string
err = db.Select(&rows, "SELECT h.event_type FROM history h"+
if !testing.Short() {
// Ensure that downtime events have distinct timestamps in second resolution (for start time).
time.Sleep(time.Second)
expireStart := time.Now()
expireAuthor := utils.RandomString(8)
expireComment := utils.RandomString(8)
req, err := json.Marshal(ActionsScheduleDowntimeRequest{
Type: "Host",
Filter: fmt.Sprintf(`host.name==%q`, hostname),
StartTime: expireStart.Unix(),
EndTime: expireStart.Add(time.Second).Unix(),
Fixed: true,
Author: expireAuthor,
Comment: expireComment,
})
require.NoError(t, err, "marshal request")
response, err := client.PostJson("/v1/actions/schedule-downtime", bytes.NewBuffer(req))
require.NoError(t, err, "schedule-downtime")
require.Equal(t, 200, response.StatusCode, "schedule-downtime")
var scheduleResponse ActionsScheduleDowntimeResponse
err = json.NewDecoder(response.Body).Decode(&scheduleResponse)
require.NoError(t, err, "decode schedule-downtime response")
require.Equal(t, 1, len(scheduleResponse.Results), "schedule-downtime should return 1 result")
require.Equal(t, http.StatusOK, scheduleResponse.Results[0].Code, "schedule-downtime result should have OK status")
// Icinga only expires downtimes every 60 seconds, so wait this long in addition to the downtime duration.
time.Sleep(60*time.Second + 1*time.Second)
expected = append(expected,
HistoryEvent{Event: "downtime_start", Author: expireAuthor, Comment: expireComment, Cancelled: "n"},
HistoryEvent{Event: "downtime_end", Author: expireAuthor, Comment: expireComment, Cancelled: "n"},
)
downtimeEnd = time.Now()
}
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, stream)
}
if !eventually.Assert(t, func(t require.TestingT) {
var got []HistoryEvent
err = db.Select(&got, "SELECT h.event_type, d.author, d.comment, d.has_been_cancelled FROM history h"+
" JOIN host ON host.id = h.host_id"+
// Joining downtime_history checks that events are written to it.
" JOIN downtime_history d ON d.downtime_id = h.downtime_history_id"+
@ -298,14 +439,28 @@ func testHistory(t *testing.T, numNodes int) {
hostname, downtimeStart.Add(-time.Second).UnixMilli(), downtimeEnd.Add(time.Second).UnixMilli())
require.NoError(t, err, "select downtime_history")
require.Equal(t, []string{"downtime_start", "downtime_end"}, rows,
"downtime history should match expected result")
}, 5*time.Second, 200*time.Millisecond)
assert.Equal(t, expected, got, "downtime history should match expected result")
}, 5*time.Second, 200*time.Millisecond) {
t.Logf("\n%s", utils.MustT(t).String(utils.PrettySelect(db,
"SELECT h.event_time, h.event_type FROM history h"+
" JOIN host ON host.id = h.host_id"+
" LEFT JOIN downtime_history d ON d.downtime_id = h.downtime_history_id"+
" WHERE host.name = ?"+
" ORDER BY h.event_time", hostname)))
}
testConsistency(t, stream)
if testing.Short() {
t.Skip("skipped expiring downtime")
}
})
t.Run("Flapping", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:flapping"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -327,7 +482,7 @@ func testHistory(t *testing.T, numNodes int) {
timeAfter := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:flapping")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -344,11 +499,15 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, []string{"flapping_start", "flapping_end"}, rows,
"flapping history should match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("Notification", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:notification"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -403,7 +562,7 @@ func testHistory(t *testing.T, numNodes int) {
timeAfter := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:notification")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -420,11 +579,15 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, expected, rows, "notification history should match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
t.Run("State", func(t *testing.T) {
t.Parallel()
const stream = "icinga:history:stream:state"
hostname := utils.UniqueName(t, "host")
client.CreateHost(t, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
@ -467,7 +630,7 @@ func testHistory(t *testing.T, numNodes int) {
timeAfter := time.Now()
for _, n := range nodes {
assertEventuallyDrained(t, n.RedisClient, "icinga:history:stream:state")
assertEventuallyDrained(t, n.RedisClient, stream)
}
eventually.Assert(t, func(t require.TestingT) {
@ -482,6 +645,8 @@ func testHistory(t *testing.T, numNodes int) {
require.Equal(t, expected, rows, "state history does not match expected result")
}, 5*time.Second, 200*time.Millisecond)
testConsistency(t, stream)
})
}
@ -493,6 +658,62 @@ func assertEventuallyDrained(t testing.TB, redis *redis.Client, stream string) {
}, 5*time.Second, 10*time.Millisecond)
}
func assertStreamConsistency(t testing.TB, clients []*redis.Client, stream string) {
messages := make([][]map[string]interface{}, len(clients))
for i, c := range clients {
xmessages, err := c.XRange(context.Background(), stream, "-", "+").Result()
require.NoError(t, err, "reading %s should not fail", stream)
assert.NotEmpty(t, xmessages, "%s should not be empty on the Redis server %d", stream, i)
// Convert []XMessage into a slice of just the values. The IDs are generated by the Redis server based
// on the time the entry was written to the stream, so these IDs are expected to differ.
ms := make([]map[string]interface{}, 0, len(xmessages))
for _, xmessage := range xmessages {
values := xmessage.Values
// Delete endpoint_id as this is supposed to differ between both history streams as each endpoint
// writes its own ID into the stream.
delete(values, "endpoint_id")
// users_notified_ids represents a set, so order does not matter, sort for the comparison later.
if idsJson, ok := values["users_notified_ids"]; ok {
var ids []string
err := json.Unmarshal([]byte(idsJson.(string)), &ids)
require.NoError(t, err, "if users_notified_ids is present, it must be a JSON list of strings")
sort.Strings(ids)
values["users_notified_ids"] = ids
}
ms = append(ms, values)
}
sort.Slice(ms, func(i, j int) bool {
eventTime := func(v map[string]interface{}) uint64 {
s, ok := v["event_time"].(string)
if !ok {
return 0
}
u, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0
}
return u
}
sortKey := func(v map[string]interface{}) string {
return fmt.Sprintf("%020d-%s", eventTime(v), v["event_id"])
}
return sortKey(ms[i]) < sortKey(ms[j])
})
messages[i] = ms
}
for i := 0; i < len(messages)-1; i++ {
assert.Equal(t, messages[i], messages[i+1], "%s should be equal on both Redis servers", stream)
}
}
func processCheckResult(t *testing.T, client *utils.Icinga2Client, hostname string, status int) time.Time {
// Ensure that check results have distinct timestamps in millisecond resolution.
time.Sleep(10 * time.Millisecond)
@ -563,6 +784,11 @@ type ActionsAddCommentResponse struct {
} `json:"results"`
}
type ActionsRemoveCommentRequest struct {
Comment string `json:"comment"`
Author string `json:"author"`
}
type ActionsProcessCheckResultRequest struct {
Type string `json:"type"`
Filter string `json:"filter"`
@ -572,6 +798,7 @@ type ActionsProcessCheckResultRequest struct {
type ActionsRemoveDowntimeRequest struct {
Downtime string `json:"downtime"`
Author string `json:"author"`
}
type ActionsScheduleDowntimeRequest struct {