notifications: IGL Changes For Rules

The rules and rule version is now part of the Event.

Also rename the Client method receiver variable.
This commit is contained in:
Alvar Penning 2025-09-09 12:09:02 +02:00
parent 697eca139d
commit 3a7e1f4aff
No known key found for this signature in database
2 changed files with 48 additions and 45 deletions

View file

@ -132,10 +132,10 @@ func NewNotificationsClient(
// > select * from host where id = :host_id and environment_id = :environment_id and name like 'prefix_%'
//
// The :host_id and :environment_id parameters will be bound to the entity's ID and EnvironmentId fields, respectively.
func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) {
outRuleIds := make([]int64, 0, len(s.rules.Rules))
func (client *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) {
outRuleIds := make([]int64, 0, len(client.rules.Rules))
for rule := range s.rules.Iter() {
for rule := range client.rules.Iter() {
if rule.ObjectFilterExpr == "" {
outRuleIds = append(outRuleIds, rule.Id)
continue
@ -148,7 +148,7 @@ func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Ent
if err != nil {
return false, errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr)
}
rows, err := s.db.NamedQueryContext(ctx, s.db.Rebind(query), entity)
rows, err := client.db.NamedQueryContext(ctx, client.db.Rebind(query), entity)
if err != nil {
return false, err
}
@ -175,7 +175,7 @@ func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Ent
// This function is used by all event builders to create a common event structure that includes the host and service
// names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event.
// Any event type-specific information (like severity, message, etc.) is added by the specific event builders.
func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) {
func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) {
var (
objectName string
objectUrl *url.URL
@ -185,7 +185,7 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error)
if rlr.ServiceName != "" {
objectName = rlr.HostName + "!" + rlr.ServiceName
objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service")
objectUrl = client.notificationsClient.JoinIcingaWeb2Path("/icingadb/service")
objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName)
objectTags = map[string]string{
@ -195,7 +195,7 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error)
} else {
objectName = rlr.HostName
objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host")
objectUrl = client.notificationsClient.JoinIcingaWeb2Path("/icingadb/host")
objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName)
objectTags = map[string]string{
@ -214,13 +214,13 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error)
//
// The resulted event will have all the necessary information for a state change event, and must
// not be further modified by the caller.
func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := s.buildCommonEvent(res)
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
}
@ -262,13 +262,13 @@ func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH
}
// buildDowntimeHistoryMetaEvent from a downtime history entry.
func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) {
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := s.buildCommonEvent(res)
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
}
@ -302,13 +302,13 @@ func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history
}
// buildFlappingHistoryEvent from a flapping history entry.
func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := s.buildCommonEvent(res)
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
}
@ -333,13 +333,13 @@ func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla
}
// buildAcknowledgementHistoryEvent from an acknowledgment history entry.
func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
if err != nil {
return nil, err
}
ev, err := s.buildCommonEvent(res)
ev, err := client.buildCommonEvent(res)
if err != nil {
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
}
@ -371,17 +371,17 @@ func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1hist
}
// worker is the background worker launched by NewNotificationsClient.
func (s *Client) worker() {
defer s.ctxCancel()
func (client *Client) worker() {
defer client.ctxCancel()
for {
select {
case <-s.ctx.Done():
case <-client.ctx.Done():
return
case sub, more := <-s.inputCh:
case sub, more := <-client.inputCh:
if !more { // Should never happen, but just in case.
s.logger.Debug("Input channel closed, stopping worker")
client.logger.Debug("Input channel closed, stopping worker")
return
}
@ -393,40 +393,40 @@ func (s *Client) worker() {
// Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go
switch h := sub.entity.(type) {
case *v1history.AcknowledgementHistory:
ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h)
ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h)
case *v1history.DowntimeHistoryMeta:
ev, eventErr = s.buildDowntimeHistoryMetaEvent(s.ctx, h)
ev, eventErr = client.buildDowntimeHistoryMetaEvent(client.ctx, h)
case *v1history.FlappingHistory:
ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h)
ev, eventErr = client.buildFlappingHistoryEvent(client.ctx, h)
case *v1history.StateHistory:
if h.StateType != common.HardState {
continue
}
ev, eventErr = s.buildStateHistoryEvent(s.ctx, h)
ev, eventErr = client.buildStateHistoryEvent(client.ctx, h)
default:
s.logger.Error("Cannot process unsupported type",
client.logger.Error("Cannot process unsupported type",
zap.Object("submission", sub),
zap.String("type", fmt.Sprintf("%T", h)))
continue
}
if eventErr != nil {
s.logger.Errorw("Cannot build event from history entry",
client.logger.Errorw("Cannot build event from history entry",
zap.Object("submission", sub),
zap.String("type", fmt.Sprintf("%T", sub.entity)),
zap.Error(eventErr))
continue
} else if ev == nil {
// This really should not happen.
s.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub))
client.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub))
continue
}
eventLogger := s.logger.With(zap.Object(
eventLogger := client.logger.With(zap.Object(
"event",
zapcore.ObjectMarshalerFunc(func(encoder zapcore.ObjectEncoder) error {
encoder.AddString("name", ev.Name)
@ -438,7 +438,7 @@ func (s *Client) worker() {
sub.traces["evaluate_jump_pre"] = time.Now()
reevaluateRules:
sub.traces["evaluate_jump_last"] = time.Now()
eventRuleIds, err := s.evaluateRulesForObject(s.ctx, sub.entity)
eventRuleIds, err := client.evaluateRulesForObject(client.ctx, sub.entity)
if err != nil {
eventLogger.Errorw("Cannot evaluate rules for event",
zap.Object("submission", sub),
@ -446,21 +446,24 @@ func (s *Client) worker() {
continue
}
ev.RulesVersion = client.rules.Version
ev.RuleIds = eventRuleIds
sub.traces["process_last"] = time.Now()
newEventRules, err := s.notificationsClient.ProcessEvent(s.ctx, ev, s.rules.Version, eventRuleIds...)
newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev)
if errors.Is(err, source.ErrRulesOutdated) {
s.rules = newEventRules
client.rules = newEventRules
eventLogger.Infow("Re-evaluating rules for event after fetching new rules",
zap.Object("submission", sub),
zap.String("rules_version", s.rules.Version))
zap.String("rules_version", client.rules.Version))
// Re-evaluate the just fetched rules for the current event.
goto reevaluateRules
} else if err != nil {
eventLogger.Errorw("Cannot submit event to Icinga Notifications",
zap.Object("submission", sub),
zap.String("rules_version", s.rules.Version),
zap.String("rules_version", client.rules.Version),
zap.Any("rules", eventRuleIds),
zap.Error(err))
continue
@ -478,7 +481,7 @@ func (s *Client) worker() {
//
// Internally, a buffered channel is used for delivery. So this function should not block. Otherwise, it will abort
// after a second and an error is logged.
func (s *Client) Submit(entity database.Entity) {
func (client *Client) Submit(entity database.Entity) {
sub := submission{
entity: entity,
traces: map[string]time.Time{
@ -487,17 +490,17 @@ func (s *Client) Submit(entity database.Entity) {
}
select {
case <-s.ctx.Done():
s.logger.Errorw("Client context is done, rejecting submission",
case <-client.ctx.Done():
client.logger.Errorw("Client context is done, rejecting submission",
zap.Object("submission", sub),
zap.Error(s.ctx.Err()))
zap.Error(client.ctx.Err()))
return
case s.inputCh <- sub:
case client.inputCh <- sub:
return
case <-time.After(time.Second):
s.logger.Error("Client submission channel is blocking, rejecting submission",
client.logger.Error("Client submission channel is blocking, rejecting submission",
zap.Object("submission", sub))
return
}

View file

@ -23,14 +23,14 @@ import (
// If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the
// request and return an error indicating that the operation timed out. In case of the serviceId being set, the
// maximum execution time of the Redis HGet commands is 10s (5s for each HGet call).
func (s *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
redisHGet := func(typ, field string, out *redisLookupResult) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := retry.WithBackoff(
ctx,
func(ctx context.Context) error { return s.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) },
func(ctx context.Context) error { return client.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) },
retry.Retryable,
backoff.DefaultBackoff,
retry.Settings{},