diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go index 1a0ddd23..f150cf2b 100644 --- a/pkg/notifications/notifications.go +++ b/pkg/notifications/notifications.go @@ -132,10 +132,10 @@ func NewNotificationsClient( // > select * from host where id = :host_id and environment_id = :environment_id and name like 'prefix_%' // // The :host_id and :environment_id parameters will be bound to the entity's ID and EnvironmentId fields, respectively. -func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) { - outRuleIds := make([]int64, 0, len(s.rules.Rules)) +func (client *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) { + outRuleIds := make([]int64, 0, len(client.rules.Rules)) - for rule := range s.rules.Iter() { + for rule := range client.rules.Iter() { if rule.ObjectFilterExpr == "" { outRuleIds = append(outRuleIds, rule.Id) continue @@ -148,7 +148,7 @@ func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Ent if err != nil { return false, errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr) } - rows, err := s.db.NamedQueryContext(ctx, s.db.Rebind(query), entity) + rows, err := client.db.NamedQueryContext(ctx, client.db.Rebind(query), entity) if err != nil { return false, err } @@ -175,7 +175,7 @@ func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Ent // This function is used by all event builders to create a common event structure that includes the host and service // names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event. // Any event type-specific information (like severity, message, etc.) is added by the specific event builders. -func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { +func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { var ( objectName string objectUrl *url.URL @@ -185,7 +185,7 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) if rlr.ServiceName != "" { objectName = rlr.HostName + "!" + rlr.ServiceName - objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") + objectUrl = client.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) objectTags = map[string]string{ @@ -195,7 +195,7 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) } else { objectName = rlr.HostName - objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") + objectUrl = client.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) objectTags = map[string]string{ @@ -214,13 +214,13 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) // // The resulted event will have all the necessary information for a state change event, and must // not be further modified by the caller. -func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { - res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) +func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { + res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err } - ev, err := s.buildCommonEvent(res) + ev, err := client.buildCommonEvent(res) if err != nil { return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } @@ -262,13 +262,13 @@ func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH } // buildDowntimeHistoryMetaEvent from a downtime history entry. -func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { - res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) +func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { + res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err } - ev, err := s.buildCommonEvent(res) + ev, err := client.buildCommonEvent(res) if err != nil { return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } @@ -302,13 +302,13 @@ func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history } // buildFlappingHistoryEvent from a flapping history entry. -func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { - res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) +func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { + res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err } - ev, err := s.buildCommonEvent(res) + ev, err := client.buildCommonEvent(res) if err != nil { return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } @@ -333,13 +333,13 @@ func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla } // buildAcknowledgementHistoryEvent from an acknowledgment history entry. -func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { - res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) +func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { + res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err } - ev, err := s.buildCommonEvent(res) + ev, err := client.buildCommonEvent(res) if err != nil { return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } @@ -371,17 +371,17 @@ func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1hist } // worker is the background worker launched by NewNotificationsClient. -func (s *Client) worker() { - defer s.ctxCancel() +func (client *Client) worker() { + defer client.ctxCancel() for { select { - case <-s.ctx.Done(): + case <-client.ctx.Done(): return - case sub, more := <-s.inputCh: + case sub, more := <-client.inputCh: if !more { // Should never happen, but just in case. - s.logger.Debug("Input channel closed, stopping worker") + client.logger.Debug("Input channel closed, stopping worker") return } @@ -393,40 +393,40 @@ func (s *Client) worker() { // Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go switch h := sub.entity.(type) { case *v1history.AcknowledgementHistory: - ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h) + ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h) case *v1history.DowntimeHistoryMeta: - ev, eventErr = s.buildDowntimeHistoryMetaEvent(s.ctx, h) + ev, eventErr = client.buildDowntimeHistoryMetaEvent(client.ctx, h) case *v1history.FlappingHistory: - ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h) + ev, eventErr = client.buildFlappingHistoryEvent(client.ctx, h) case *v1history.StateHistory: if h.StateType != common.HardState { continue } - ev, eventErr = s.buildStateHistoryEvent(s.ctx, h) + ev, eventErr = client.buildStateHistoryEvent(client.ctx, h) default: - s.logger.Error("Cannot process unsupported type", + client.logger.Error("Cannot process unsupported type", zap.Object("submission", sub), zap.String("type", fmt.Sprintf("%T", h))) continue } if eventErr != nil { - s.logger.Errorw("Cannot build event from history entry", + client.logger.Errorw("Cannot build event from history entry", zap.Object("submission", sub), zap.String("type", fmt.Sprintf("%T", sub.entity)), zap.Error(eventErr)) continue } else if ev == nil { // This really should not happen. - s.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub)) + client.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub)) continue } - eventLogger := s.logger.With(zap.Object( + eventLogger := client.logger.With(zap.Object( "event", zapcore.ObjectMarshalerFunc(func(encoder zapcore.ObjectEncoder) error { encoder.AddString("name", ev.Name) @@ -438,7 +438,7 @@ func (s *Client) worker() { sub.traces["evaluate_jump_pre"] = time.Now() reevaluateRules: sub.traces["evaluate_jump_last"] = time.Now() - eventRuleIds, err := s.evaluateRulesForObject(s.ctx, sub.entity) + eventRuleIds, err := client.evaluateRulesForObject(client.ctx, sub.entity) if err != nil { eventLogger.Errorw("Cannot evaluate rules for event", zap.Object("submission", sub), @@ -446,21 +446,24 @@ func (s *Client) worker() { continue } + ev.RulesVersion = client.rules.Version + ev.RuleIds = eventRuleIds + sub.traces["process_last"] = time.Now() - newEventRules, err := s.notificationsClient.ProcessEvent(s.ctx, ev, s.rules.Version, eventRuleIds...) + newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev) if errors.Is(err, source.ErrRulesOutdated) { - s.rules = newEventRules + client.rules = newEventRules eventLogger.Infow("Re-evaluating rules for event after fetching new rules", zap.Object("submission", sub), - zap.String("rules_version", s.rules.Version)) + zap.String("rules_version", client.rules.Version)) // Re-evaluate the just fetched rules for the current event. goto reevaluateRules } else if err != nil { eventLogger.Errorw("Cannot submit event to Icinga Notifications", zap.Object("submission", sub), - zap.String("rules_version", s.rules.Version), + zap.String("rules_version", client.rules.Version), zap.Any("rules", eventRuleIds), zap.Error(err)) continue @@ -478,7 +481,7 @@ func (s *Client) worker() { // // Internally, a buffered channel is used for delivery. So this function should not block. Otherwise, it will abort // after a second and an error is logged. -func (s *Client) Submit(entity database.Entity) { +func (client *Client) Submit(entity database.Entity) { sub := submission{ entity: entity, traces: map[string]time.Time{ @@ -487,17 +490,17 @@ func (s *Client) Submit(entity database.Entity) { } select { - case <-s.ctx.Done(): - s.logger.Errorw("Client context is done, rejecting submission", + case <-client.ctx.Done(): + client.logger.Errorw("Client context is done, rejecting submission", zap.Object("submission", sub), - zap.Error(s.ctx.Err())) + zap.Error(client.ctx.Err())) return - case s.inputCh <- sub: + case client.inputCh <- sub: return case <-time.After(time.Second): - s.logger.Error("Client submission channel is blocking, rejecting submission", + client.logger.Error("Client submission channel is blocking, rejecting submission", zap.Object("submission", sub)) return } diff --git a/pkg/notifications/redis_fetch.go b/pkg/notifications/redis_fetch.go index 7d7ec61d..3cacde16 100644 --- a/pkg/notifications/redis_fetch.go +++ b/pkg/notifications/redis_fetch.go @@ -23,14 +23,14 @@ import ( // If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the // request and return an error indicating that the operation timed out. In case of the serviceId being set, the // maximum execution time of the Redis HGet commands is 10s (5s for each HGet call). -func (s *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { +func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { redisHGet := func(typ, field string, out *redisLookupResult) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err := retry.WithBackoff( ctx, - func(ctx context.Context) error { return s.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) }, + func(ctx context.Context) error { return client.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) }, retry.Retryable, backoff.DefaultBackoff, retry.Settings{},