Merge pull request #998 from Icinga/icinga-notifications-init
Some checks are pending
Compliance / compliance (push) Waiting to run
Build and Publish Container Image / build-and-publish-container-image (push) Waiting to run
Go / build-test (macos-latest) (push) Waiting to run
Go / build-test (ubuntu-latest) (push) Waiting to run
Go / lint (push) Waiting to run
Go / vet (push) Waiting to run
Go / fmt (push) Waiting to run
Go / modtidy (push) Waiting to run
Go / vendor-diff (push) Waiting to run
Integration Tests / MySQL (push) Waiting to run
Integration Tests / PostgreSQL (push) Waiting to run
SQL / MySQL 5.5 (push) Waiting to run
SQL / MySQL 5.6 (push) Waiting to run
SQL / MariaDB 10.1 (push) Waiting to run
SQL / MariaDB 10.2 (push) Waiting to run
SQL / MariaDB 10.3 (push) Waiting to run
SQL / MariaDB 10.4 (push) Waiting to run
SQL / MariaDB 10.5 (push) Waiting to run
SQL / MariaDB 10.6 (push) Waiting to run
SQL / MariaDB 10.7 (push) Waiting to run
SQL / MariaDB latest (push) Waiting to run
SQL / MySQL 5.7 (push) Waiting to run
SQL / MySQL 8 (push) Waiting to run
SQL / MySQL latest (push) Waiting to run
SQL / PostgreSQL 10 (push) Waiting to run
SQL / PostgreSQL 11 (push) Waiting to run
SQL / PostgreSQL 12 (push) Waiting to run
SQL / PostgreSQL 13 (push) Waiting to run
SQL / PostgreSQL 9.6 (push) Waiting to run
SQL / PostgreSQL latest (push) Waiting to run
Sync For-Container.md to Docker Hub / sync (push) Waiting to run

Initial Icinga Notifications Source
This commit is contained in:
Julian Brost 2025-11-17 13:46:42 +01:00 committed by GitHub
commit ff1c49e63f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1665 additions and 43 deletions

View file

@ -15,6 +15,7 @@ import (
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/notifications"
"github.com/okzk/sdnotify"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -168,13 +169,32 @@ func run() int {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
go func() {
logger.Info("Starting history sync")
{
var extraStages map[string]history.StageFunc
if cfg := cmd.Config.Notifications; cfg.Url != "" {
logger.Info("Starting Icinga Notifications source")
if err := hs.Sync(ctx); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
notificationsSource, err := notifications.NewNotificationsClient(
ctx,
db,
rc,
logs.GetChildLogger("notifications"),
cfg)
if err != nil {
logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err))
}
extraStages = notificationsSource.SyncExtraStages()
}
}()
go func() {
logger.Info("Starting history sync")
if err := hs.Sync(ctx, extraStages); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
}
}()
}
// Main loop
for {

View file

@ -139,3 +139,15 @@ redis:
# flapping:
# notification:
# state:
# Icinga DB can act as an event source for Icinga Notifications. If the following block is not empty, Icinga DB will
# submit events to the Icinga Notifications API.
#notifications:
# URL to the API root.
# url: http://localhost:5680
# Username to authenticate against the Icinga Notifications API.
# username: icingadb
# Password for the defined user.
# password: insecureinsecure

View file

@ -146,7 +146,7 @@ ICINGADB_LOGGING_OPTIONS=database:error,high-availability:debug
| runtime-updates | Runtime updates of config objects after the initial config synchronization. |
| telemetry | Reporting of Icinga DB status to Icinga 2 via Redis® (for monitoring purposes). |
## Retention
## Retention Configuration
By default, no historical data is deleted, which means that the longer the data is retained,
the more disk space is required to store it. History retention is an optional feature that allows to
@ -174,6 +174,26 @@ ICINGADB_RETENTION_OPTIONS=comment:356
| count | **Optional.** Number of old historical data a single query can delete in a `"DELETE FROM ... LIMIT count"` manner. Defaults to `5000`. |
| options | **Optional.** Map of history category to number of days to retain its data. Available categories are `acknowledgement`, `comment`, `downtime`, `flapping`, `notification` and `state`. |
## Notifications Configuration
!!! warning
The Icinga Notifications integration is a feature preview that you can try out.
However, incompatible changes may happen, so be sure to check the changelog for future updates.
Please do not use it in production.
Icinga DB can act as an event source for [Icinga Notifications](https://icinga.com/docs/icinga-notifications/).
If configured, Icinga DB will submit events to the Icinga Notifications API.
For YAML configuration, the options are part of the `notifications` dictionary.
For environment variables, each option is prefixed with `ICINGADB_NOTIFICATIONS_`.
| Option | Description |
|----------|-----------------------------------------------------------------------------------|
| url | **Optional.** Icinga Notifications API base URL, such as `http://localhost:5680`. |
| username | **Optional.** Icinga Notifications API user for this source. |
| password | **Optional.** Icinga Notifications API user password. |
## Appendix
### Duration String

View file

@ -4,6 +4,7 @@ import (
"github.com/creasty/defaults"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/notifications/source"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icingadb/pkg/icingadb/history"
"github.com/pkg/errors"
@ -15,10 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml"
// Config defines Icinga DB config.
type Config struct {
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
Notifications source.Config `yaml:"notifications" envPrefix:"NOTIFICATIONS_"`
}
func (c *Config) SetDefaults() {

View file

@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"reflect"
"slices"
"sync"
)
@ -37,7 +38,10 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
}
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
func (s Sync) Sync(ctx context.Context) error {
//
// The optional extraStages parameter allows specifying an additional extra stage for each pipeline, identified by their
// key. This stage is executed after every other stage, but before the entry gets deleted from Redis.
func (s Sync) Sync(ctx context.Context, extraStages map[string]StageFunc) error {
g, ctx := errgroup.WithContext(ctx)
for key, pipeline := range syncPipelines {
@ -62,6 +66,18 @@ func (s Sync) Sync(ctx context.Context) error {
// it has processed it, even if the stage itself does not do anything with this specific entry. It should only
// forward the entry after it has completed its own sync so that later stages can rely on previous stages being
// executed successfully.
//
// If an extra stage exists for this key, it will be appended to the pipeline. Thus, it is executed after every
// other pipeline action, but before deleteFromRedis.
// Shadowed variable to allow appending custom callbacks.
pipeline := pipeline
if extraStages != nil {
extraStage, ok := extraStages[key]
if ok {
pipeline = append(slices.Clip(pipeline), extraStage)
}
}
ch := make([]chan redis.XMessage, len(pipeline)+1)
for i := range ch {
@ -152,26 +168,25 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
}
counter.Add(uint64(len(ids)))
telemetry.Stats.History.Add(uint64(len(ids)))
case <-ctx.Done():
return ctx.Err()
}
}
}
// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
// StageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information
// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history
// events from and an out channel to forward history entries to after processing them successfully. A stage function
// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On
// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis
// and can be processed at a later time.
type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error
type StageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error
// writeOneEntityStage creates a stageFunc from a pointer to a struct implementing the v1.UpserterEntity interface.
// writeOneEntityStage creates a StageFunc from a pointer to a struct implementing the v1.UpserterEntity interface.
// For each history event it receives, it parses that event into a new instance of that entity type and writes it to
// the database. It writes exactly one entity to the database for each history event.
func writeOneEntityStage(structPtr any) stageFunc {
func writeOneEntityStage(structPtr any) StageFunc {
structifier := structify.MakeMapStructifier(
reflect.TypeOf(structPtr).Elem(),
"json",
@ -190,9 +205,9 @@ func writeOneEntityStage(structPtr any) stageFunc {
})
}
// writeMultiEntityStage creates a stageFunc from a function that takes a history event as an input and returns a
// writeMultiEntityStage creates a StageFunc from a function that takes a history event as an input and returns a
// (potentially empty) slice of v1.UpserterEntity instances that it then inserts into the database.
func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) stageFunc {
func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) StageFunc {
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
type State struct {
Message redis.XMessage // Original event from Redis.
@ -304,7 +319,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse
}
}
// userNotificationStage is a specialized stageFunc that populates the user_notification_history table. It is executed
// userNotificationStage is a specialized StageFunc that populates the user_notification_history table. It is executed
// on the notification history stream and uses the users_notified_ids attribute to create an entry in the
// user_notification_history relation table for each user ID.
func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
@ -361,32 +376,70 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re
})(ctx, s, key, in, out)
}
var syncPipelines = map[string][]stageFunc{
"notification": {
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
userNotificationStage, // user_notification_history (depends on notification_history)
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
// countElementStage increments the [Stats.History] counter.
//
// This StageFunc should be called last in each syncPipeline. Thus, it is executed before the final
// Sync.deleteFromRedis call in Sync.Sync, but before optional extra stages, potentially blocking.
func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
defer close(out)
for {
select {
case msg, ok := <-in:
if !ok {
return nil
}
telemetry.Stats.History.Add(1)
out <- msg
case <-ctx.Done():
return ctx.Err()
}
}
}
const (
SyncPipelineAcknowledgement = "acknowledgement"
SyncPipelineComment = "comment"
SyncPipelineDowntime = "downtime"
SyncPipelineFlapping = "flapping"
SyncPipelineNotification = "notification"
SyncPipelineState = "state"
)
var syncPipelines = map[string][]StageFunc{
SyncPipelineAcknowledgement: {
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
countElementStage,
},
"state": {
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
SyncPipelineComment: {
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
countElementStage,
},
"downtime": {
SyncPipelineDowntime: {
writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history
writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history)
writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime
countElementStage,
},
"comment": {
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
},
"flapping": {
SyncPipelineFlapping: {
writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history
writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history)
countElementStage,
},
"acknowledgement": {
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
SyncPipelineNotification: {
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
userNotificationStage, // user_notification_history (depends on notification_history)
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
countElementStage,
},
SyncPipelineState: {
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
countElementStage,
},
}

View file

@ -88,6 +88,15 @@ func (*HistoryDowntime) TableName() string {
return "history"
}
// DowntimeHistoryMeta is a combined HistoryMeta struct of DowntimeHistoryEntity and DowntimeHistory.
//
// It is used in the notifications package and became necessary as values of both structs were required.
type DowntimeHistoryMeta struct {
DowntimeHistoryEntity `json:",inline"`
DowntimeHistory `json:",inline"`
HistoryMeta `json:",inline"`
}
type SlaHistoryDowntime struct {
DowntimeHistoryEntity `json:",inline"`
HistoryTableMeta `json:",inline"`

View file

@ -14,17 +14,23 @@ import (
var Stats struct {
// Config & co. are to be increased by the T sync once for every T object synced.
Config, State, History, Overdue, HistoryCleanup com.Counter
Config com.Counter
State com.Counter
History com.Counter
Overdue com.Counter
HistoryCleanup com.Counter
NotificationSync com.Counter
}
// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) {
counters := map[string]*com.Counter{
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
"notification_sync": &Stats.NotificationSync,
}
periodic.Start(ctx, time.Second, func(_ periodic.Tick) {

171
pkg/notifications/fetch.go Normal file
View file

@ -0,0 +1,171 @@
package notifications
import (
"context"
"encoding/json"
"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/retry"
"github.com/icinga/icinga-go-library/types"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"time"
)
// fetchHostServiceFromRedis retrieves the host and service names from Redis.
//
// If serviceId is nil, only the host name is fetched. Otherwise, both host and service name is fetched.
func (client *Client) fetchHostServiceFromRedis(
ctx context.Context,
hostId, serviceId types.Binary,
) (hostName string, serviceName string, err error) {
getNameFromRedis := func(ctx context.Context, typ, id string) (string, error) {
key := "icinga:" + typ
var data string
err := retry.WithBackoff(
ctx,
func(ctx context.Context) (err error) {
data, err = client.redisClient.HGet(ctx, key, id).Result()
return
},
retry.Retryable,
backoff.DefaultBackoff,
retry.Settings{},
)
if err != nil {
return "", errors.Wrapf(err, "redis HGET %q, %q failed", key, id)
}
var result struct {
Name string `json:"name"`
}
if err := json.Unmarshal([]byte(data), &result); err != nil {
return "", errors.Wrap(err, "failed to unmarshal redis result")
}
return result.Name, nil
}
hostName, err = getNameFromRedis(ctx, "host", hostId.String())
if err != nil {
return
}
if serviceId != nil {
serviceName, err = getNameFromRedis(ctx, "service", serviceId.String())
if err != nil {
return
}
}
return
}
// customVar is used as an internal representation in Client.fetchCustomVarFromSql.
type customVar struct {
Name string `db:"name"`
Value types.String `db:"value"`
}
// getValue returns this customvar's value as a string, transforming SQL NULLs to empty strings.
func (cv customVar) getValue() string {
if cv.Value.Valid {
return cv.Value.String
}
return ""
}
// fetchCustomVarFromSql retrieves custom variables for the host and service from SQL.
//
// If serviceId is nil, only the host custom vars are fetched. Otherwise, both host and service custom vars are fetched.
func (client *Client) fetchCustomVarFromSql(
ctx context.Context,
hostId, serviceId types.Binary,
) (map[string]string, error) {
getCustomVarsFromSql := func(ctx context.Context, typ string, id types.Binary) ([]customVar, error) {
stmt, err := client.db.Preparex(client.db.Rebind(
`SELECT
customvar_flat.flatname AS name,
customvar_flat.flatvalue AS value
FROM ` + typ + `_customvar
JOIN customvar_flat
ON ` + typ + `_customvar.customvar_id = customvar_flat.customvar_id
WHERE ` + typ + `_customvar.` + typ + `_id = ?`))
if err != nil {
return nil, err
}
var customVars []customVar
if err := stmt.SelectContext(ctx, &customVars, id); err != nil {
return nil, err
}
return customVars, nil
}
customVars := make(map[string]string)
hostVars, err := getCustomVarsFromSql(ctx, "host", hostId)
if err != nil {
return nil, err
}
for _, hostVar := range hostVars {
customVars["host.vars."+hostVar.Name] = hostVar.getValue()
}
if serviceId != nil {
serviceVars, err := getCustomVarsFromSql(ctx, "service", serviceId)
if err != nil {
return nil, err
}
for _, serviceVar := range serviceVars {
customVars["service.vars."+serviceVar.Name] = serviceVar.getValue()
}
}
return customVars, nil
}
// hostServiceInformation contains the host name, an optional service name, and all custom variables.
//
// Returned from Client.fetchHostServiceData.
type hostServiceInformation struct {
hostName string
serviceName string
customVars map[string]string
}
// fetchHostServiceData resolves the object names and fetches the associated custom variables.
//
// If serviceId is not nil, both host and service data will be queried. Otherwise, only host information is fetched. To
// acquire the information, the fetchHostServiceFromRedis and fetchCustomVarFromSql methods are used concurrently with
// a timeout of three seconds.
func (client *Client) fetchHostServiceData(
ctx context.Context,
hostId, serviceId types.Binary,
) (*hostServiceInformation, error) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
ret := &hostServiceInformation{}
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
var err error
ret.hostName, ret.serviceName, err = client.fetchHostServiceFromRedis(ctx, hostId, serviceId)
return err
})
g.Go(func() error {
var err error
ret.customVars, err = client.fetchCustomVarFromSql(ctx, hostId, serviceId)
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
return ret, nil
}

View file

@ -0,0 +1,534 @@
package notifications
import (
"context"
"encoding/json"
"fmt"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/notifications/event"
"github.com/icinga/icinga-go-library/notifications/source"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icinga-go-library/structify"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-go-library/utils"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb/history"
v1history "github.com/icinga/icingadb/pkg/icingadb/v1/history"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"reflect"
"sync"
)
// Client is an Icinga Notifications compatible client implementation to push events to Icinga Notifications.
//
// A new Client should be created by the NewNotificationsClient function. New history entries can be submitted by
// calling the Client.Submit method.
type Client struct {
source.Config
db *database.DB
logger *logging.Logger
rulesInfo *source.RulesInfo // rulesInfo holds the latest rulesInfo fetched from Icinga Notifications.
ctx context.Context
notificationsClient *source.Client // The Icinga Notifications client used to interact with the API.
redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events.
submissionMutex sync.Mutex // submissionMutex protects not concurrent safe struct fields in Client.Submit, i.e., rulesInfo.
}
// NewNotificationsClient creates a new Client connected to an existing database and logger.
func NewNotificationsClient(
ctx context.Context,
db *database.DB,
rc *redis.Client,
logger *logging.Logger,
cfg source.Config,
) (*Client, error) {
notificationsClient, err := source.NewClient(cfg, "Icinga DB "+internal.Version.Version)
if err != nil {
return nil, err
}
return &Client{
Config: cfg,
db: db,
logger: logger,
ctx: ctx,
rulesInfo: &source.RulesInfo{},
notificationsClient: notificationsClient,
redisClient: rc,
}, nil
}
// evaluateRulesForObject checks each rule against the Icinga DB SQL database and returns matching rule IDs.
//
// Within the Icinga Notifications relation database, the rules are stored in rule.object_filter as a JSON object
// created by Icinga DB Web. This object contains SQL queries with bindvars for the Icinga DB relational database, to be
// executed with the given host, service and environment IDs. If this query returns at least one row, the rule is
// considered as matching.
//
// Icinga DB Web's JSON structure is described in:
// - https://github.com/Icinga/icingadb-web/pull/1289
// - https://github.com/Icinga/icingadb/pull/998#issuecomment-3442298348
func (client *Client) evaluateRulesForObject(ctx context.Context, hostId, serviceId, environmentId types.Binary) ([]string, error) {
const icingaDbWebRuleVersion = 1
type IcingaDbWebQuery struct {
Query string `json:"query"`
Parameters []any `json:"parameters"`
}
type IcingaDbWebRule struct {
Version int `json:"version"` // expect icingaDbWebRuleVersion
Queries struct {
Host *IcingaDbWebQuery `json:"host,omitempty"`
Service *IcingaDbWebQuery `json:"service,omitempty"`
} `json:"queries"`
}
outRuleIds := make([]string, 0, len(client.rulesInfo.Rules))
for id, filterExpr := range client.rulesInfo.Rules {
if filterExpr == "" {
outRuleIds = append(outRuleIds, id)
continue
}
var webRule IcingaDbWebRule
if err := json.Unmarshal([]byte(filterExpr), &webRule); err != nil {
return nil, errors.Wrap(err, "cannot decode rule filter expression as JSON into struct")
}
if version := webRule.Version; version != icingaDbWebRuleVersion {
return nil, errors.Errorf("decoded rule filter expression .Version is %d, %d expected", version, icingaDbWebRuleVersion)
}
var webQuery IcingaDbWebQuery
if !serviceId.Valid() {
// Evaluate rule for a host object
if webRule.Queries.Host == nil {
continue
}
webQuery = *webRule.Queries.Host
} else {
// Evaluate rule for a service object
if webRule.Queries.Service == nil {
continue
}
webQuery = *webRule.Queries.Service
}
queryArgs := make([]any, 0, len(webQuery.Parameters))
for _, param := range webQuery.Parameters {
switch param {
case ":host_id":
queryArgs = append(queryArgs, hostId)
case ":service_id":
if !serviceId.Valid() {
return nil, errors.New("host rule filter expression contains :service_id for replacement")
}
queryArgs = append(queryArgs, serviceId)
case ":environment_id":
queryArgs = append(queryArgs, environmentId)
default:
queryArgs = append(queryArgs, param)
}
}
matches, err := func() (bool, error) {
rows, err := client.db.QueryContext(ctx, client.db.Rebind(webQuery.Query), queryArgs...)
if err != nil {
return false, err
}
defer func() { _ = rows.Close() }()
return rows.Next(), nil
}()
if err != nil {
return nil, errors.Wrapf(err, "cannot fetch rule %q from %q", id, filterExpr)
} else if !matches {
continue
}
outRuleIds = append(outRuleIds, id)
}
return outRuleIds, nil
}
// buildCommonEvent creates an event.Event based on Host and (optional) Service IDs.
//
// This function is used by all event builders to create a common event structure that includes the host and service
// names, an Icinga DB Web reference, and the tags for the event.
// Any event type-specific information (like severity, message, etc.) is added by the specific event builders.
func (client *Client) buildCommonEvent(
ctx context.Context,
hostId, serviceId types.Binary,
) (*event.Event, *hostServiceInformation, error) {
info, err := client.fetchHostServiceData(ctx, hostId, serviceId)
if err != nil {
return nil, nil, err
}
var (
objectName string
objectUrl string
objectTags map[string]string
)
if info.serviceName != "" {
objectName = info.hostName + "!" + info.serviceName
objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(info.serviceName) + "&host.name=" + utils.RawUrlEncode(info.hostName)
objectTags = map[string]string{
"host": info.hostName,
"service": info.serviceName,
}
} else {
objectName = info.hostName
objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(info.hostName)
objectTags = map[string]string{
"host": info.hostName,
}
}
return &event.Event{
Name: objectName,
URL: objectUrl,
Tags: objectTags,
ExtraTags: info.customVars,
}, info, nil
}
// buildStateHistoryEvent builds a fully initialized event.Event from a state history entry.
//
// The resulted event will have all the necessary information for a state change event, and must
// not be further modified by the caller.
func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
ev, info, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
ev.Type = event.TypeState
if info.serviceName != "" {
switch h.HardState {
case 0:
ev.Severity = event.SeverityOK
case 1:
ev.Severity = event.SeverityWarning
case 2:
ev.Severity = event.SeverityCrit
case 3:
ev.Severity = event.SeverityErr
default:
return nil, fmt.Errorf("unexpected service state %d", h.HardState)
}
} else {
switch h.HardState {
case 0:
ev.Severity = event.SeverityOK
case 1:
ev.Severity = event.SeverityCrit
default:
return nil, fmt.Errorf("unexpected host state %d", h.HardState)
}
}
if h.Output.Valid {
ev.Message = h.Output.String
}
if h.LongOutput.Valid {
ev.Message += "\n" + h.LongOutput.String
}
return ev, nil
}
// buildDowntimeHistoryMetaEvent from a downtime history entry.
func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) {
ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
switch h.EventType {
case "downtime_start":
ev.Type = event.TypeDowntimeStart
ev.Username = h.Author
ev.Message = h.Comment
ev.Mute = types.MakeBool(true)
ev.MuteReason = "Checkable is in downtime"
case "downtime_end":
ev.Mute = types.MakeBool(false)
if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool {
ev.Type = event.TypeDowntimeRemoved
ev.Message = "Downtime was cancelled"
if h.CancelledBy.Valid {
ev.Username = h.CancelledBy.String
}
} else {
ev.Type = event.TypeDowntimeEnd
ev.Message = "Downtime expired"
}
default:
return nil, fmt.Errorf("unexpected event type %q", h.EventType)
}
return ev, nil
}
// buildFlappingHistoryEvent from a flapping history entry.
func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
if h.PercentStateChangeEnd.Valid {
ev.Type = event.TypeFlappingEnd
ev.Message = fmt.Sprintf(
"Checkable stopped flapping (Current flapping value %.2f%% < low threshold %.2f%%)",
h.PercentStateChangeEnd.Float64, h.FlappingThresholdLow)
ev.Mute = types.MakeBool(false)
} else if h.PercentStateChangeStart.Valid {
ev.Type = event.TypeFlappingStart
ev.Message = fmt.Sprintf(
"Checkable started flapping (Current flapping value %.2f%% > high threshold %.2f%%)",
h.PercentStateChangeStart.Float64, h.FlappingThresholdHigh)
ev.Mute = types.MakeBool(true)
ev.MuteReason = "Checkable is flapping"
} else {
return nil, errors.New("flapping history entry has neither percent_state_change_start nor percent_state_change_end")
}
return ev, nil
}
// buildAcknowledgementHistoryEvent from an acknowledgment history entry.
func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
if !h.ClearTime.Time().IsZero() {
ev.Type = event.TypeAcknowledgementCleared
ev.Message = "Acknowledgement was cleared"
ev.Mute = types.MakeBool(false)
if h.ClearedBy.Valid {
ev.Username = h.ClearedBy.String
}
} else if !h.SetTime.Time().IsZero() {
ev.Type = event.TypeAcknowledgementSet
ev.Mute = types.MakeBool(true)
ev.MuteReason = "Checkable was acknowledged"
if h.Comment.Valid {
ev.Message = h.Comment.String
} else {
ev.Message = "Checkable was acknowledged"
}
if h.Author.Valid {
ev.Username = h.Author.String
}
} else {
return nil, errors.New("acknowledgment history entry has neither a set_time nor a clear_time")
}
return ev, nil
}
// Submit this [database.Entity] to the Icinga Notifications API.
//
// Based on the entity's type, a different kind of event will be constructed. The event will be sent to the API in a
// blocking fashion.
//
// Returns true if this entity was processed or cannot be processed any further. Returns false if this entity should be
// retried later.
//
// This method usees the Client's logger.
func (client *Client) Submit(entity database.Entity) bool {
if client.ctx.Err() != nil {
client.logger.Errorw("Cannot process submitted entity as client context is done", zap.Error(client.ctx.Err()))
return true
}
var (
ev *event.Event
eventErr error
metaHistory v1history.HistoryTableMeta
)
// Keep the type switch in sync with the values of SyncKeyStructPtrs below.
switch h := entity.(type) {
case *v1history.AcknowledgementHistory:
ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h)
metaHistory = h.HistoryTableMeta
case *v1history.DowntimeHistoryMeta:
ev, eventErr = client.buildDowntimeHistoryMetaEvent(client.ctx, h)
metaHistory = h.HistoryTableMeta
case *v1history.FlappingHistory:
ev, eventErr = client.buildFlappingHistoryEvent(client.ctx, h)
metaHistory = h.HistoryTableMeta
case *v1history.StateHistory:
if h.StateType != common.HardState {
return true
}
ev, eventErr = client.buildStateHistoryEvent(client.ctx, h)
metaHistory = h.HistoryTableMeta
default:
client.logger.Error("Cannot process unsupported type", zap.String("type", fmt.Sprintf("%T", h)))
return true
}
if eventErr != nil {
client.logger.Errorw("Cannot build event from history entry",
zap.String("type", fmt.Sprintf("%T", entity)),
zap.Error(eventErr))
return true
} else if ev == nil {
// This really should not happen.
client.logger.Errorw("No event was built, but no error was reported",
zap.String("type", fmt.Sprintf("%T", entity)))
return true
}
eventLogger := client.logger.With(zap.Object(
"event",
zapcore.ObjectMarshalerFunc(func(encoder zapcore.ObjectEncoder) error {
encoder.AddString("name", ev.Name)
encoder.AddString("type", ev.Type.String())
return nil
}),
))
// The following code accesses Client.rulesInfo.
client.submissionMutex.Lock()
defer client.submissionMutex.Unlock()
// This loop allows resubmitting an event if the rules have changed. The first try would be the rule update, the
// second try would be the resubmit, and the third try would be for bad luck, e.g., when a second rule update just
// crept in between. If there are three subsequent rule updates, something is wrong.
for try := 0; try < 3; try++ {
eventRuleIds, err := client.evaluateRulesForObject(
client.ctx,
metaHistory.HostId,
metaHistory.ServiceId,
metaHistory.EnvironmentId)
if err != nil {
// While returning false would be more correct, this would result in never being able to refetch new rule
// versions. Consider an invalid object filter expression, which is now impossible to get rid of.
eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err))
eventRuleIds = []string{}
}
ev.RulesVersion = client.rulesInfo.Version
ev.RuleIds = eventRuleIds
newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev)
if errors.Is(err, source.ErrRulesOutdated) {
eventLogger.Infow("Received a rule update from Icinga Notification, resubmitting event",
zap.String("old_rules_version", client.rulesInfo.Version),
zap.String("new_rules_version", newEventRules.Version))
client.rulesInfo = newEventRules
continue
} else if err != nil {
eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried",
zap.String("rules_version", client.rulesInfo.Version),
zap.Any("rules", eventRuleIds),
zap.Error(err))
return false
}
eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds))
return true
}
eventLogger.Error("Received three rule updates from Icinga Notifications in a row, event will be retried")
return false
}
// SyncExtraStages returns a map of history sync keys to [history.StageFunc] to be used for [history.Sync].
//
// Passing the return value of this method as the extraStages parameter to [history.Sync] results in forwarding events
// from the Icinga DB history stream to Icinga Notifications after being resorted via the StreamSorter.
func (client *Client) SyncExtraStages() map[string]history.StageFunc {
var syncKeyStructPtrs = map[string]any{
history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil),
history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil),
history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil),
history.SyncPipelineState: (*v1history.StateHistory)(nil),
}
sorterCallbackFn := func(msg redis.XMessage, key string) bool {
makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) {
structPtr, ok := syncKeyStructPtrs[key]
if !ok {
return nil, fmt.Errorf("key is not part of keyStructPtrs")
}
structifier := structify.MakeMapStructifier(
reflect.TypeOf(structPtr).Elem(),
"json",
contracts.SafeInit)
val, err := structifier(values)
if err != nil {
return nil, errors.Wrapf(err, "can't structify values %#v for %q", values, key)
}
entity, ok := val.(database.Entity)
if !ok {
return nil, fmt.Errorf("structifier returned %T which does not implement database.Entity", val)
}
return entity, nil
}
entity, err := makeEntity(key, msg.Values)
if err != nil {
client.logger.Errorw("Failed to create database.Entity out of Redis stream message",
zap.Error(err),
zap.String("key", key),
zap.String("id", msg.ID))
return false
}
success := client.Submit(entity)
if success {
telemetry.Stats.NotificationSync.Add(1)
}
return success
}
pipelineFn := NewStreamSorter(client.ctx, client.logger, sorterCallbackFn).PipelineFunc
extraStages := make(map[string]history.StageFunc)
for k := range syncKeyStructPtrs {
extraStages[k] = pipelineFn
}
return extraStages
}

416
pkg/notifications/sorter.go Normal file
View file

@ -0,0 +1,416 @@
package notifications
import (
"container/heap"
"context"
"fmt"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icingadb/pkg/icingadb/history"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"strconv"
"strings"
"time"
)
// parseRedisStreamId parses a Redis Stream ID and returns the timestamp in ms and the sequence number, or an error.
func parseRedisStreamId(redisStreamId string) (int64, int64, error) {
dashPos := strings.IndexRune(redisStreamId, '-')
if dashPos <= 0 {
return 0, 0, errors.Errorf("value %q does not satisfy Redis Stream ID pattern", redisStreamId)
}
ms, err := strconv.ParseInt(redisStreamId[:dashPos], 10, 64)
if err != nil {
return 0, 0, errors.Wrapf(
err,
"timestamp part of the Redis Stream ID %q cannot be parsed to int", redisStreamId)
}
seq, err := strconv.ParseInt(redisStreamId[dashPos+1:], 10, 64)
if err != nil {
return 0, 0, errors.Wrapf(
err,
"sequence number of the Redis Stream ID %q cannot be parsed to int", redisStreamId)
}
return ms, seq, nil
}
// streamSorterSubmission is one submission to a StreamSorter, allowing to be sorted by the Redis Stream ID - both via
// timestamp and the sequence number as a fallback - as well as the submission timestamp for duplicates if milliseconds
// are not precise enough.
type streamSorterSubmission struct {
// msg is the Redis message to be forwarded to out after this submission was sorted.
msg redis.XMessage
key string
out chan<- redis.XMessage
// Required for sorting.
streamIdMs int64 // streamIdMs is the Redis Stream ID timestamp part (milliseconds)
streamIdSeq int64 // streamIdSeq is the Redis Stream ID sequence number
submitTime time.Time // submitTime is the timestamp when the element was submitted
}
// MarshalLogObject implements [zapcore.ObjectMarshaler].
func (sub *streamSorterSubmission) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddInt64("redis-id-ms", sub.streamIdMs)
encoder.AddInt64("redis-id-seq", sub.streamIdSeq)
encoder.AddTime("submit-time", sub.submitTime)
encoder.AddString("out", fmt.Sprint(sub.out))
return nil
}
// streamSorterSubmissions implements [heap.Interface] for []streamSorterSubmission.
type streamSorterSubmissions []*streamSorterSubmission
// Len implements [sort.Interface] required by [heap.Interface].
func (subs streamSorterSubmissions) Len() int { return len(subs) }
// Swap implements [sort.Interface] required by [heap.Interface].
func (subs streamSorterSubmissions) Swap(i, j int) { subs[i], subs[j] = subs[j], subs[i] }
// Less implements [sort.Interface] required by [heap.Interface].
func (subs streamSorterSubmissions) Less(i, j int) bool {
a, b := subs[i], subs[j]
if a.streamIdMs != b.streamIdMs {
return a.streamIdMs < b.streamIdMs
}
if a.streamIdSeq != b.streamIdSeq {
return a.streamIdSeq < b.streamIdSeq
}
return a.submitTime.Before(b.submitTime)
}
// Push implements [heap.Interface].
func (subs *streamSorterSubmissions) Push(x any) {
sub, ok := x.(*streamSorterSubmission)
if !ok {
panic(fmt.Sprintf("streamSorterSubmissions.Push received x of %T", x))
}
*subs = append(*subs, sub)
}
// Pop implements [heap.Interface].
func (subs *streamSorterSubmissions) Pop() any {
old := *subs
n := len(old)
x := old[n-1]
*subs = old[0 : n-1]
return x
}
// Peek returns the smallest element from the heap without removing it, or nil if the heap is empty.
func (subs streamSorterSubmissions) Peek() *streamSorterSubmission {
if len(subs) > 0 {
return subs[0]
} else {
return nil
}
}
// StreamSorter is a helper that can used to intercept messages from different history sync pipelines and passes them
// to a callback in the order given by their Redis Stream ID (sorted across all involved streams).
//
// After a message is received, it is kept in a priority queue for three seconds to wait for possible messages from
// another stream with a smaller ID. Thus, if a message is received delayed for more than three seconds, it will be
// relayed out of order. The StreamSorter is only able to ensure order to a certain degree of chaos.
//
// The callback function receives the [redis.XMessage] together with the Redis stream name (key) for additional
// context. The callback function is supposed to return true on success. Otherwise, the callback will be retried until
// it succeeds.
type StreamSorter struct {
ctx context.Context
logger *logging.Logger
callbackFn func(redis.XMessage, string) bool
submissionCh chan *streamSorterSubmission
// registerOutCh is used by PipelineFunc() to register output channels with worker()
registerOutCh chan chan<- redis.XMessage
// closeOutCh is used by PipelineFunc() to signal to worker() that there will be no more submissions destined for
// that output channel and it can be closed by the worker after it processed all pending submissions for it.
closeOutCh chan chan<- redis.XMessage
// The following fields should only be changed for the tests.
// callbackMaxDelay is the maximum delay for continuously failing callbacks. Defaults to 10s.
callbackMaxDelay time.Duration
// submissionMinAge is the minimum age for a submission before being forwarded. Defaults to 3s.
submissionMinAge time.Duration
// isVerbose implies a isVerbose debug logging. Don't think one want to have this outside the tests.
isVerbose bool
}
// NewStreamSorter creates a StreamSorter honoring the given context and returning elements to the callback function.
func NewStreamSorter(
ctx context.Context,
logger *logging.Logger,
callbackFn func(msg redis.XMessage, key string) bool,
) *StreamSorter {
sorter := &StreamSorter{
ctx: ctx,
logger: logger,
callbackFn: callbackFn,
submissionCh: make(chan *streamSorterSubmission),
registerOutCh: make(chan chan<- redis.XMessage),
closeOutCh: make(chan chan<- redis.XMessage),
callbackMaxDelay: 10 * time.Second,
submissionMinAge: 3 * time.Second,
}
go sorter.worker()
return sorter
}
// verbose produces a debug log messages if StreamSorter.isVerbose is set.
func (sorter *StreamSorter) verbose(msg string, keysAndValues ...any) {
// When used in tests and the test context is done, using the logger results in a data race. Since there are a few
// log messages which might occur after the test has finished, better not log at all.
// https://github.com/uber-go/zap/issues/687#issuecomment-473382859
if sorter.ctx.Err() != nil {
return
}
if !sorter.isVerbose {
return
}
sorter.logger.Debugw(msg, keysAndValues...)
}
// startCallback initiates the callback in a background goroutine and returns a channel that is closed once the callback
// has succeeded. It retries the callback with a backoff until it signal success by returning true.
func (sorter *StreamSorter) startCallback(msg redis.XMessage, key string) <-chan struct{} {
callbackCh := make(chan struct{})
go func() {
defer close(callbackCh)
callbackDelay := time.Duration(0)
for try := 0; ; try++ {
select {
case <-sorter.ctx.Done():
return
case <-time.After(callbackDelay):
}
start := time.Now()
success := sorter.callbackFn(msg, key)
sorter.verbose("startCallback: finished executing callbackFn",
zap.String("id", msg.ID),
zap.Bool("success", success),
zap.Int("try", try),
zap.Duration("duration", time.Since(start)),
zap.Duration("next-delay", callbackDelay))
if success {
return
} else {
callbackDelay = min(max(time.Millisecond, 2*callbackDelay), sorter.callbackMaxDelay)
}
}
}()
return callbackCh
}
// worker is the background worker, started in a goroutine from NewStreamSorter, reacts upon messages from the channels,
// and runs until the StreamSorter.ctx is done.
func (sorter *StreamSorter) worker() {
// When a streamSorterSubmission is created in the submit method, the current time.Time is added to the struct.
// Only if the submission was at least three seconds (submissionMinAge) ago, a popped submission from the heap will
// be passed to startCallback in its own goroutine to execute the callback function.
var submissionHeap streamSorterSubmissions
// Each registered output is stored in the registeredOutputs map, mapping output channels to the following struct.
// It counts pending submissions in the heap for each received submission from submissionCh and can be marked as
// closed to be cleaned up after its work is done.
type OutputState struct {
pending int
close bool
}
registeredOutputs := make(map[chan<- redis.XMessage]*OutputState)
// Close all registered outputs when we exit.
defer func() {
for out := range registeredOutputs {
close(out)
}
}()
// If a submission is currently given to the callback via startCallback, these two variables are not nil. After the
// callback has finished, the channel will be closed.
var runningSubmission *streamSorterSubmission
var runningCallbackCh <-chan struct{}
for {
if (runningSubmission == nil) != (runningCallbackCh == nil) {
panic(fmt.Sprintf("inconsistent state: runningSubmission=%#v and runningCallbackCh=%#v",
runningSubmission, runningCallbackCh))
}
var nextSubmissionDue <-chan time.Time
if runningCallbackCh == nil {
if next := submissionHeap.Peek(); next != nil {
if submissionAge := time.Since(next.submitTime); submissionAge >= sorter.submissionMinAge {
runningCallbackCh = sorter.startCallback(next.msg, next.key)
runningSubmission = next
heap.Pop(&submissionHeap)
} else {
nextSubmissionDue = time.After(sorter.submissionMinAge - submissionAge)
}
}
}
select {
case out := <-sorter.registerOutCh:
sorter.verbose("worker: register output", zap.String("out", fmt.Sprint(out)))
if _, ok := registeredOutputs[out]; ok {
panic("attempting to register the same output channel twice")
}
registeredOutputs[out] = &OutputState{}
// This function is now responsible for closing out.
case out := <-sorter.closeOutCh:
if state := registeredOutputs[out]; state == nil {
panic("requested to close unknown output channel")
} else if state.pending > 0 {
// Still pending work, mark the output and wait for it to complete.
state.close = true
} else {
// Output can be closed and unregistered immediately
close(out)
delete(registeredOutputs, out)
}
case sub := <-sorter.submissionCh:
sorter.verbose("worker: push submission to heap", zap.Object("submission", sub))
if state := registeredOutputs[sub.out]; state == nil {
panic("submission for an unknown output channel")
} else {
state.pending++
heap.Push(&submissionHeap, sub)
}
case <-nextSubmissionDue:
// Loop start processing of the next submission.
continue
case <-runningCallbackCh:
out := runningSubmission.out
out <- runningSubmission.msg
state := registeredOutputs[out]
state.pending--
if state.close && state.pending == 0 {
close(out)
delete(registeredOutputs, out)
}
runningCallbackCh = nil
runningSubmission = nil
case <-sorter.ctx.Done():
return
}
}
}
// submit a [redis.XMessage] to the StreamSorter.
func (sorter *StreamSorter) submit(msg redis.XMessage, key string, out chan<- redis.XMessage) error {
ms, seq, err := parseRedisStreamId(msg.ID)
if err != nil {
return errors.Wrap(err, "cannot parse Redis Stream ID")
}
submission := &streamSorterSubmission{
msg: msg,
key: key,
out: out,
streamIdMs: ms,
streamIdSeq: seq,
submitTime: time.Now(),
}
select {
case sorter.submissionCh <- submission:
return nil
case <-time.After(time.Second):
return errors.New("submission timed out")
case <-sorter.ctx.Done():
return sorter.ctx.Err()
}
}
// PipelineFunc implements the [history.StageFunc] type expected for a history sync pipeline stage.
//
// This method of a single StreamSorter can be inserted into multiple history sync pipelines and will forward all
// messages from in to out as expected from a pipeline stage. In between, all messages are processed by the
// StreamSorter, which correlates the messages from different pipelines and additionally passes them to a callback
// according to its specification (see the comment on the StreamSorter type).
func (sorter *StreamSorter) PipelineFunc(
ctx context.Context,
_ history.Sync,
key string,
in <-chan redis.XMessage,
out chan<- redis.XMessage,
) error {
// Register output channel with worker.
select {
case sorter.registerOutCh <- out:
// Success, worker is now responsible for closing the channel.
case <-ctx.Done():
close(out)
return ctx.Err()
case <-sorter.ctx.Done():
close(out)
return sorter.ctx.Err()
}
// If we exit, signal to the worker that no more work for this channel will be submitted.
defer func() {
select {
case sorter.closeOutCh <- out:
// Success, worker will close the output channel eventually.
case <-sorter.ctx.Done():
// Worker will quit entirely, closing all output channels.
}
}()
for {
select {
case msg, ok := <-in:
if !ok {
return nil
}
err := sorter.submit(msg, key, out)
if err != nil {
sorter.logger.Errorw("Failed to submit Redis stream event to stream sorter",
zap.String("key", key),
zap.Error(err))
}
case <-ctx.Done():
return ctx.Err()
case <-sorter.ctx.Done():
return sorter.ctx.Err()
}
}
}

View file

@ -0,0 +1,379 @@
// #nosec G404 -- Allow math/rand for the tests
package notifications
import (
"cmp"
"context"
"fmt"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icingadb/pkg/icingadb/history"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
)
func Test_redisStreamIdToMs(t *testing.T) {
tests := []struct {
name string
input string
wantMs int64
wantSeq int64
wantErr bool
}{
{
name: "epoch",
input: "0-0",
},
{
name: "valid",
input: "1761658169701-0",
wantMs: 1761658169701,
},
{
name: "valid sequence",
input: "1761658169701-23",
wantMs: 1761658169701,
wantSeq: 23,
},
{
name: "invalid format",
input: "23-42-23",
wantErr: true,
},
{
name: "missing first part",
input: "-23",
wantErr: true,
},
{
name: "missing second part",
input: "23-",
wantErr: true,
},
{
name: "only dash",
input: "-",
wantErr: true,
},
{
name: "just invalid",
input: "oops",
wantErr: true,
},
{
name: "invalid field types",
input: "0x23-0x42",
wantErr: true,
},
{
name: "number too big",
input: "22222222222222222222222222222222222222222222222222222222222222222222222222222222222222222-0",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotMs, gotSeq, err := parseRedisStreamId(tt.input)
require.Equal(t, tt.wantErr, err != nil, "error differs %v", err)
require.Equal(t, tt.wantMs, gotMs, "ms from Redis Stream ID differs")
require.Equal(t, tt.wantSeq, gotSeq, "seq from Redis Stream ID differs")
})
}
}
func Test_streamSorterSubmissions(t *testing.T) {
mkSubmitTime := func(offset int) time.Time {
return time.Date(2009, 11, 10, 23, 0, 0, offset, time.UTC)
}
submissions := streamSorterSubmissions{
{streamIdMs: 0, streamIdSeq: 0, submitTime: mkSubmitTime(0)},
{streamIdMs: 1, streamIdSeq: 0, submitTime: mkSubmitTime(0)},
{streamIdMs: 1, streamIdSeq: 1, submitTime: mkSubmitTime(0)},
{streamIdMs: 2, streamIdSeq: 0, submitTime: mkSubmitTime(0)},
{streamIdMs: 2, streamIdSeq: 0, submitTime: mkSubmitTime(1)},
{streamIdMs: 3, streamIdSeq: 0, submitTime: mkSubmitTime(0)},
{streamIdMs: 3, streamIdSeq: 1, submitTime: mkSubmitTime(0)},
{streamIdMs: 3, streamIdSeq: 1, submitTime: mkSubmitTime(1)},
{streamIdMs: 3, streamIdSeq: 1, submitTime: mkSubmitTime(2)},
}
submissionsRand := make(streamSorterSubmissions, 0, len(submissions))
for _, i := range rand.Perm(len(submissions)) {
submissionsRand = append(submissionsRand, submissions[i])
}
sort.Sort(submissionsRand)
require.Equal(t, submissions, submissionsRand)
}
func TestStreamSorter(t *testing.T) {
tests := []struct {
name string
messages int
producers int
producersEarlyClose int
callbackMaxDelayMs int
callbackSuccessPercent int
expectTimeout bool
outMaxDelayMs int
}{
{
name: "baseline",
messages: 10,
producers: 1,
callbackSuccessPercent: 100,
},
{
name: "simple",
messages: 100,
producers: 10,
callbackSuccessPercent: 100,
},
{
name: "many producers",
messages: 100,
producers: 100,
callbackSuccessPercent: 100,
},
{
name: "many messages",
messages: 1000,
producers: 10,
callbackSuccessPercent: 100,
},
{
name: "callback a bit unreliable",
messages: 50,
producers: 10,
callbackSuccessPercent: 70,
},
{
name: "callback coin flip",
messages: 50,
producers: 10,
callbackSuccessPercent: 50,
},
{
name: "callback unreliable",
messages: 25,
producers: 5,
callbackSuccessPercent: 30,
},
{
name: "callback total rejection",
messages: 10,
producers: 1,
callbackSuccessPercent: 0,
expectTimeout: true,
},
{
name: "callback slow",
messages: 100,
producers: 10,
callbackMaxDelayMs: 3000,
callbackSuccessPercent: 100,
},
{
name: "out slow",
messages: 100,
producers: 10,
callbackSuccessPercent: 100,
outMaxDelayMs: 1000,
},
{
name: "producer out early close",
messages: 100,
producers: 10,
producersEarlyClose: 5,
callbackMaxDelayMs: 1000,
callbackSuccessPercent: 100,
},
{
name: "pure chaos",
messages: 50,
producers: 10,
callbackMaxDelayMs: 3000,
callbackSuccessPercent: 50,
outMaxDelayMs: 1000,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
// Callback functions after reordering
var (
callbackCollection []string
callbackCollectionMutex sync.Mutex
callbackFn = func(msg redis.XMessage, _ string) bool {
if tt.callbackMaxDelayMs > 0 {
time.Sleep(time.Duration(rand.Int63n(int64(tt.callbackMaxDelayMs))) * time.Microsecond)
}
if rand.Int63n(100)+1 > int64(tt.callbackSuccessPercent) {
return false
}
callbackCollectionMutex.Lock()
defer callbackCollectionMutex.Unlock()
callbackCollection = append(callbackCollection, msg.ID)
return true
}
)
// Out channel after reordering and callback
var (
outCounterCh = make(chan struct{})
outConsumer = func(out chan redis.XMessage) {
for {
if tt.outMaxDelayMs > 0 {
time.Sleep(time.Duration(rand.Int63n(int64(tt.outMaxDelayMs))) * time.Microsecond)
}
_, ok := <-out
if !ok {
return
}
outCounterCh <- struct{}{}
}
}
)
// Decreasing counter for messages to be sent
var (
inCounter = tt.messages
inCounterMutex sync.Mutex
)
sorter := NewStreamSorter(
t.Context(),
logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second),
callbackFn)
sorter.callbackMaxDelay = 100 * time.Millisecond
sorter.submissionMinAge = 50 * time.Millisecond
sorter.isVerbose = true
for i := range tt.producers {
earlyClose := i < tt.producersEarlyClose
in := make(chan redis.XMessage)
out := make(chan redis.XMessage)
go func() {
err := sorter.PipelineFunc(t.Context(), history.Sync{}, "", in, out)
// When closing down, both the test context is closed and the in channel is closed deferred. Within
// the PipelineFunc's select, both are checked and in most cases, the closed channel wins. However,
// sometimes a context close wins.
//
// So, ignore this kind of error. The checks below will still work and report if something is wrong.
if !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}
}()
if !earlyClose {
defer close(in) // no leakage, general cleanup
}
go func() {
for {
time.Sleep(time.Duration(rand.Int63n(250)) * time.Microsecond)
inCounterMutex.Lock()
isFin := inCounter <= 0
if !isFin {
inCounter--
}
inCounterMutex.Unlock()
if isFin {
return
}
ms := time.Now().UnixMilli() + rand.Int63n(10) - 5
seq := rand.Int63n(1_000)
// Add 10% time travelers
if rand.Int63n(10) == 9 {
distanceMs := int64(5)
if rand.Int63n(2) > 0 {
// Don't go back too far. Otherwise, elements would be out of order - submissionMinAge.
ms -= distanceMs
} else {
ms += distanceMs
}
}
msg := redis.XMessage{ID: fmt.Sprintf("%d-%d", ms, seq)}
in <- msg
// 25% chance of closing for early closing producers
if earlyClose && rand.Int63n(4) == 3 {
close(in)
t.Log("closed producer early")
return
}
}
}()
go outConsumer(out)
}
var outCounter int
breakFor:
for {
select {
case <-outCounterCh:
outCounter++
if outCounter == tt.messages {
break breakFor
}
case <-time.After(3 * time.Second):
if tt.expectTimeout {
return
}
t.Fatalf("Collecting messages timed out after receiving %d out of %d messages",
outCounter, tt.messages)
}
}
if tt.expectTimeout {
t.Fatal("Timeout was expected")
}
t.Log("received all messages")
callbackCollectionMutex.Lock()
for i := 0; i < len(callbackCollection)-1; i++ {
parse := func(id string) (int64, int64) {
parts := strings.Split(id, "-")
ms, err1 := strconv.ParseInt(parts[0], 10, 64)
seq, err2 := strconv.ParseInt(parts[1], 10, 64)
require.NoError(t, cmp.Or(err1, err2))
return ms, seq
}
a, b := callbackCollection[i], callbackCollection[i+1]
aMs, aSeq := parse(a)
bMs, bSeq := parse(b)
switch {
case aMs < bMs:
case aMs == bMs:
if aSeq > bSeq {
t.Errorf("collection in wrong order: %q before %q", a, b)
}
case aMs > bMs:
t.Errorf("collection in wrong order: %q before %q", a, b)
}
}
callbackCollectionMutex.Unlock()
})
}
}