Minor Tweaks for Icinga Notifications Integration

- Don't validate notifications config in a background Goroutine.
- Clip pipeline slice to avoid reusing capability twice.
- Rework notification Client.buildCommonEvent and depending methods.
- Resubmit events after updating rules in one go.
- Simplify Client.fetchHostServiceName based on Julian's suggestion.

Co-Authored-By: Julian Brost <julian.brost@icinga.com>
This commit is contained in:
Alvar Penning 2025-10-21 16:36:04 +02:00
parent ad26a7857d
commit 1ec561415d
No known key found for this signature in database
5 changed files with 125 additions and 132 deletions

View file

@ -170,7 +170,7 @@ func run() int {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
go func() {
{
var (
callbackName string
callbackKeyStructPtr map[string]any
@ -180,29 +180,34 @@ func run() int {
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
logger.Info("Starting Icinga Notifications source")
notificationsSource := notifications.NewNotificationsClient(
notificationsSource, err := notifications.NewNotificationsClient(
ctx,
db,
rc,
logs.GetChildLogger("notifications-source"),
cfg)
if err != nil {
logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err))
}
callbackName = "notifications_sync"
callbackKeyStructPtr = notifications.SyncKeyStructPtrs
callbackFn = notificationsSource.Submit
}
logger.Info("Starting history sync")
go func() {
logger.Info("Starting history sync")
if err := hs.Sync(
ctx,
callbackName,
callbackKeyStructPtr,
callbackFn,
); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
}
}()
if err := hs.Sync(
ctx,
callbackName,
callbackKeyStructPtr,
callbackFn,
); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
}
}()
}
// Main loop
for {

View file

@ -19,6 +19,7 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"reflect"
"slices"
"sync"
"time"
)
@ -97,7 +98,7 @@ func (s Sync) Sync(
// Shadowed variable to allow appending custom callbacks.
pipeline := pipeline
if hasCallbackStage {
pipeline = append(pipeline, makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn))
pipeline = append(slices.Clip(pipeline), makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn))
}
ch := make([]chan redis.XMessage, len(pipeline)+1)
@ -470,7 +471,7 @@ func makeCallbackStageFunc(
backlogLastId := ""
backlogMsgCounter := 0
const backlogTimerMinInterval, backlogTimerMaxInterval = 10 * time.Millisecond, time.Minute
const backlogTimerMinInterval, backlogTimerMaxInterval = time.Millisecond, time.Minute
backlogTimerInterval := backlogTimerMinInterval
backlogTimer := time.NewTimer(backlogTimerInterval)
_ = backlogTimer.Stop()

View file

@ -88,6 +88,9 @@ func (*HistoryDowntime) TableName() string {
return "history"
}
// DowntimeHistoryMeta is a combined HistoryMeta struct of DowntimeHistoryEntity and DowntimeHistory.
//
// It is used in the notifications package and became necessary as values of both structs were required.
type DowntimeHistoryMeta struct {
DowntimeHistoryEntity `json:",inline"`
DowntimeHistory `json:",inline"`

View file

@ -49,26 +49,25 @@ func NewNotificationsClient(
rc *redis.Client,
logger *logging.Logger,
cfg source.Config,
) *Client {
client := &Client{
) (*Client, error) {
notificationsClient, err := source.NewClient(cfg, "Icinga DB "+internal.Version.Version)
if err != nil {
return nil, err
}
return &Client{
Config: cfg,
db: db,
logger: logger,
rulesInfo: &source.RulesInfo{},
redisClient: rc,
ctx: ctx,
}
notificationsClient, err := source.NewClient(client.Config, fmt.Sprintf("Icinga DB %s", internal.Version.Version))
if err != nil {
logger.Fatalw("Cannot create Icinga Notifications client", zap.Error(err))
}
client.notificationsClient = notificationsClient
rulesInfo: &source.RulesInfo{},
return client
notificationsClient: notificationsClient,
redisClient: rc,
}, nil
}
// evaluateRulesForObject returns the rule IDs for each matching query.
@ -124,34 +123,36 @@ func (client *Client) evaluateRulesForObject(ctx context.Context, entity databas
return outRuleIds, nil
}
// buildCommonEvent creates an event.Event based on Host and (optional) Service names.
// buildCommonEvent creates an event.Event based on Host and (optional) Service IDs.
//
// This function is used by all event builders to create a common event structure that includes the host and service
// names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event.
// names, an Icinga DB Web reference, and the tags for the event.
// Any event type-specific information (like severity, message, etc.) is added by the specific event builders.
func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) {
func (client *Client) buildCommonEvent(
ctx context.Context,
hostId, serviceId types.Binary,
) (*event.Event, *redisLookupResult, error) {
rlr, err := client.fetchHostServiceName(ctx, hostId, serviceId)
if err != nil {
return nil, nil, err
}
var (
objectName string
objectUrl url.URL
objectUrl string
objectTags map[string]string
)
if rlr.ServiceName != "" {
objectName = rlr.HostName + "!" + rlr.ServiceName
objectUrl.Path = "/icingadb/service"
objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName)
objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName)
objectTags = map[string]string{
"host": rlr.HostName,
"service": rlr.ServiceName,
}
} else {
objectName = rlr.HostName
objectUrl.Path = "/icingadb/host"
objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName)
objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(rlr.HostName)
objectTags = map[string]string{
"host": rlr.HostName,
}
@ -159,9 +160,9 @@ func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, er
return &event.Event{
Name: objectName,
URL: objectUrl.String(),
URL: objectUrl,
Tags: objectTags,
}, nil
}, rlr, nil
}
// buildStateHistoryEvent builds a fully initialized event.Event from a state history entry.
@ -169,19 +170,14 @@ func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, er
// The resulted event will have all the necessary information for a state change event, and must
// not be further modified by the caller.
func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
ev, rlr, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
ev.Type = event.TypeState
if res.ServiceName != "" {
if rlr.ServiceName != "" {
switch h.HardState {
case 0:
ev.Severity = event.SeverityOK
@ -217,14 +213,9 @@ func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.S
// buildDowntimeHistoryMetaEvent from a downtime history entry.
func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
switch h.EventType {
@ -236,6 +227,7 @@ func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1hi
ev.MuteReason = "Checkable is in downtime"
case "downtime_end":
ev.Mute = types.MakeBool(false)
if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool {
ev.Type = event.TypeDowntimeRemoved
ev.Message = "Downtime was cancelled"
@ -257,14 +249,9 @@ func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1hi
// buildFlappingHistoryEvent from a flapping history entry.
func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
if h.PercentStateChangeEnd.Valid {
@ -288,14 +275,9 @@ func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1histor
// buildAcknowledgementHistoryEvent from an acknowledgment history entry.
func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId)
}
if !h.ClearTime.Time().IsZero() {
@ -334,9 +316,6 @@ func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v
//
// This method usees the Client's logger.
func (client *Client) Submit(entity database.Entity) bool {
client.submissionMutex.Lock()
defer client.submissionMutex.Unlock()
if client.ctx.Err() != nil {
client.logger.Errorw("Cannot process submitted entity as client context is done", zap.Error(client.ctx.Err()))
return true
@ -345,7 +324,7 @@ func (client *Client) Submit(entity database.Entity) bool {
var ev *event.Event
var eventErr error
// Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go
// Keep the type switch in sync with the values of SyncKeyStructPtrs below.
switch h := entity.(type) {
case *v1history.AcknowledgementHistory:
ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h)
@ -388,36 +367,48 @@ func (client *Client) Submit(entity database.Entity) bool {
}),
))
eventRuleIds, err := client.evaluateRulesForObject(client.ctx, entity)
if err != nil {
// While returning false would be more correct, this would result in never being able to refetch new rule
// versions. Consider an invalid object filter expression, which is now impossible to get rid of.
eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err))
eventRuleIds = []string{}
// The following code accesses Client.rulesInfo.
client.submissionMutex.Lock()
defer client.submissionMutex.Unlock()
// This loop allows resubmitting an event if the rules have changed. The first try would be the rule update, the
// second try would be the resubmit, and the third try would be for bad luck, e.g., when a second rule update just
// crept in between. If there are three subsequent rule updates, something is wrong.
for try := 0; try < 3; try++ {
eventRuleIds, err := client.evaluateRulesForObject(client.ctx, entity)
if err != nil {
// While returning false would be more correct, this would result in never being able to refetch new rule
// versions. Consider an invalid object filter expression, which is now impossible to get rid of.
eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err))
eventRuleIds = []string{}
}
ev.RulesVersion = client.rulesInfo.Version
ev.RuleIds = eventRuleIds
newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev)
if errors.Is(err, source.ErrRulesOutdated) {
eventLogger.Infow("Received a rule update from Icinga Notification, resubmitting event",
zap.String("old_rules_version", client.rulesInfo.Version),
zap.String("new_rules_version", newEventRules.Version))
client.rulesInfo = newEventRules
continue
} else if err != nil {
eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried",
zap.String("rules_version", client.rulesInfo.Version),
zap.Any("rules", eventRuleIds),
zap.Error(err))
return false
}
eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds))
return true
}
ev.RulesVersion = client.rulesInfo.Version
ev.RuleIds = eventRuleIds
newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev)
if errors.Is(err, source.ErrRulesOutdated) {
eventLogger.Infow("Cannot submit event to Icinga Notifications due to rule changes, will be retried",
zap.String("old_rules_version", client.rulesInfo.Version),
zap.String("new_rules_version", newEventRules.Version))
client.rulesInfo = newEventRules
return false
} else if err != nil {
eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried",
zap.String("rules_version", client.rulesInfo.Version),
zap.Any("rules", eventRuleIds),
zap.Error(err))
return false
}
eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds))
return true
eventLogger.Error("Received three rule updates from Icinga Notifications in a row, event will be retried")
return false
}
var SyncKeyStructPtrs = map[string]any{

View file

@ -24,40 +24,52 @@ import (
// request and return an error indicating that the operation timed out. In case of the serviceId being set, the
// maximum execution time of the Redis HGet commands is 10s (5s for each HGet call).
func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
redisHGet := func(typ, field string, out *redisLookupResult) error {
getNameFromRedis := func(typ, id string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var data string
err := retry.WithBackoff(
ctx,
func(ctx context.Context) error { return client.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) },
func(ctx context.Context) (err error) {
data, err = client.redisClient.HGet(ctx, "icinga:"+typ, id).Result()
return
},
retry.Retryable,
backoff.DefaultBackoff,
retry.Settings{},
)
if err != nil {
if errors.Is(err, redis.Nil) {
return fmt.Errorf("%s with ID %s not found in Redis", typ, hostId)
return "", fmt.Errorf("%s with ID %s not found in Redis", typ, hostId)
}
return fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, field, err)
return "", fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, id, err)
}
return nil
var result struct {
Name string `json:"name"`
}
if err := json.Unmarshal([]byte(data), &result); err != nil {
return "", fmt.Errorf("failed to unmarshal redis result: %w", err)
}
return result.Name, nil
}
var result redisLookupResult
if err := redisHGet("host", hostId.String(), &result); err != nil {
var err error
result.HostName, err = getNameFromRedis("host", hostId.String())
if err != nil {
return nil, err
}
result.HostName = result.Name
result.Name = "" // Clear the name field for the host, as we will fetch the service name next.
if serviceId != nil {
if err := redisHGet("service", serviceId.String(), &result); err != nil {
result.ServiceName, err = getNameFromRedis("service", serviceId.String())
if err != nil {
return nil, err
}
result.ServiceName = result.Name
result.Name = "" // It's not needed anymore, clear it!
}
return &result, nil
@ -67,23 +79,4 @@ func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceI
type redisLookupResult struct {
HostName string `json:"-"` // Name of the host (never empty).
ServiceName string `json:"-"` // Name of the service (only set in service context).
// Name is used to retrieve the host or service name from Redis.
// It should not be used for any other purpose apart from within the [Client.fetchHostServiceName] function.
Name string `json:"name"`
}
// UnmarshalBinary implements the [encoding.BinaryUnmarshaler] interface for redisLookupResult.
//
// It unmarshals the binary data of the Redis HGet result into the redisLookupResult struct.
// This is required for the HGet().Scan() usage in the [Client.fetchHostServiceName] function to work correctly.
func (rlr *redisLookupResult) UnmarshalBinary(data []byte) error {
if len(data) == 0 {
return errors.New("empty data received for redisLookupResult")
}
if err := json.Unmarshal(data, rlr); err != nil {
return fmt.Errorf("failed to unmarshal redis result: %w", err)
}
return nil
}