From 697eca139d40cd2ca89d741d964caf1efe4fe8ae Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 5 Sep 2025 13:52:38 +0200 Subject: [PATCH] Notifications: Address Code Review - Bump IGL to latest changes in Icinga/icinga-go-library#145. - Allow specifying which pipeline keys are relevant, ignore others. - Allow specifying which pipeline key should be parsed in which type. - Create history.DowntimeHistoryMeta as a chimera combining history.DowntimeHistory and history.HistoryDowntime to allow access event_type, distinguishing between downtime_start and downtime_end. - Trace times for submission steps in the worker. Turns out, the single threaded worker blocks roughly two seconds for each Client.ProcessEvent method call. This might sum up to minutes if lots of events are processed at once. My current theory is that the delay results in the expensive bcrypt hash comparison on Notifications. --- cmd/icingadb/main.go | 31 ++-- internal/config/config.go | 12 +- pkg/icingadb/history/sync.go | 89 +++++---- pkg/icingadb/v1/history/downtime.go | 6 + pkg/notifications/notifications.go | 274 +++++++++++++++++----------- pkg/notifications/redis_fetch.go | 6 +- 6 files changed, 250 insertions(+), 168 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index c7fd403c..27cb58f2 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -170,23 +170,26 @@ func run() int { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) - var notificationsSourceCallback func(database.Entity) - if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { - logger.Info("Starting Icinga Notifications source") - - notificationsSource := notifications.NewNotificationsSource( - ctx, - db, - rc, - logs.GetChildLogger("notifications-source"), - cfg) - notificationsSourceCallback = notificationsSource.Submit - } - go func() { + var callback func(database.Entity) + var callbackKeyStructPtr map[string]any + + if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { + logger.Info("Starting Icinga Notifications source") + + notificationsSource := notifications.NewNotificationsClient( + ctx, + db, + rc, + logs.GetChildLogger("notifications-source"), + cfg) + callback = notificationsSource.Submit + callbackKeyStructPtr = notifications.SyncKeyStructPtrs + } + logger.Info("Starting history sync") - if err := hs.Sync(ctx, notificationsSourceCallback); err != nil && !utils.IsContextCanceled(err) { + if err := hs.Sync(ctx, callbackKeyStructPtr, callback); err != nil && !utils.IsContextCanceled(err) { logger.Fatalf("%+v", err) } }() diff --git a/internal/config/config.go b/internal/config/config.go index 618a85fa..359cf052 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,7 +4,7 @@ import ( "github.com/creasty/defaults" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" - "github.com/icinga/icinga-go-library/notifications" + "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icingadb/pkg/icingadb/history" "github.com/pkg/errors" @@ -16,11 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml" // Config defines Icinga DB config. type Config struct { - Database database.Config `yaml:"database" envPrefix:"DATABASE_"` - Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` - Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` - Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` - NotificationsSource notifications.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"` + Database database.Config `yaml:"database" envPrefix:"DATABASE_"` + Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` + Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` + Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` + NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"` } func (c *Config) SetDefaults() { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 30c7f5a2..ac1c28e6 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -39,9 +39,15 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. // -// If not nil, the callback function is appended to each synchronization pipeline and called before the entry is deleted -// from Redis. -func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error { +// An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil. +// +// The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If +// a key is missing from the map, it will not be used for the callback. The callback function itself shall not block. +func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, callback func(database.Entity)) error { + if (callbackKeyStructPtr == nil) != (callback == nil) { + return fmt.Errorf("either both callbackKeyStructPtr and callback must be nil or none") + } + g, ctx := errgroup.WithContext(ctx) for key, pipeline := range syncPipelines { @@ -67,8 +73,13 @@ func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error { // forward the entry after it has completed its own sync so that later stages can rely on previous stages being // executed successfully. - if callback != nil { - pipeline = append(pipeline, makeCallbackStageFunc(callback)) + // Shadowed variable to allow appending custom callbacks. + pipeline := pipeline + if callbackKeyStructPtr != nil { + _, ok := callbackKeyStructPtr[key] + if ok { + pipeline = append(pipeline, makeCallbackStageFunc(callbackKeyStructPtr, callback)) + } } ch := make([]chan redis.XMessage, len(pipeline)+1) @@ -371,28 +382,17 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re // makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message. // +// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key. +// // The callback call is blocking and the message will be forwarded to the out channel after the function has returned. // Thus, please ensure this function does not block too long. -func makeCallbackStageFunc(callback func(database.Entity)) stageFunc { +func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.Entity)) stageFunc { return func(ctx context.Context, _ Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { defer close(out) - var structPtr database.Entity - switch key { // keep in sync with syncPipelines below - case "notification": - structPtr = (*v1.NotificationHistory)(nil) - case "state": - structPtr = (*v1.StateHistory)(nil) - case "downtime": - structPtr = (*v1.DowntimeHistory)(nil) - case "comment": - structPtr = (*v1.CommentHistory)(nil) - case "flapping": - structPtr = (*v1.FlappingHistory)(nil) - case "acknowledgement": - structPtr = (*v1.AcknowledgementHistory)(nil) - default: - return fmt.Errorf("unsupported key %q", key) + structPtr, ok := keyStructPtrs[key] + if !ok { + return fmt.Errorf("can't lookup struct pointer for key %q", key) } structifier := structify.MakeMapStructifier( @@ -409,7 +409,7 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc { val, err := structifier(msg.Values) if err != nil { - return errors.Wrapf(err, "can't structify values %#v for %s", msg.Values, key) + return errors.Wrapf(err, "can't structify values %#v for %q", msg.Values, key) } entity, ok := val.(database.Entity) @@ -427,32 +427,41 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc { } } +const ( + SyncPipelineAcknowledgement = "acknowledgement" + SyncPipelineComment = "comment" + SyncPipelineDowntime = "downtime" + SyncPipelineFlapping = "flapping" + SyncPipelineNotification = "notification" + SyncPipelineState = "state" +) + var syncPipelines = map[string][]stageFunc{ - "notification": { - writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history - userNotificationStage, // user_notification_history (depends on notification_history) - writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + SyncPipelineAcknowledgement: { + writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) }, - "state": { - writeOneEntityStage((*v1.StateHistory)(nil)), // state_history - writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) - writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + SyncPipelineComment: { + writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history + writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) }, - "downtime": { + SyncPipelineDowntime: { writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime }, - "comment": { - writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history - writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) - }, - "flapping": { + SyncPipelineFlapping: { writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) }, - "acknowledgement": { - writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history - writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + SyncPipelineNotification: { + writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history + userNotificationStage, // user_notification_history (depends on notification_history) + writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + }, + SyncPipelineState: { + writeOneEntityStage((*v1.StateHistory)(nil)), // state_history + writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) + writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state }, } diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go index 969cd472..bbbfbbc1 100644 --- a/pkg/icingadb/v1/history/downtime.go +++ b/pkg/icingadb/v1/history/downtime.go @@ -88,6 +88,12 @@ func (*HistoryDowntime) TableName() string { return "history" } +type DowntimeHistoryMeta struct { + DowntimeHistoryEntity `json:",inline"` + DowntimeHistory `json:",inline"` + HistoryMeta `json:",inline"` +} + type SlaHistoryDowntime struct { DowntimeHistoryEntity `json:",inline"` HistoryTableMeta `json:",inline"` diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go index 5a469ece..1a0ddd23 100644 --- a/pkg/notifications/notifications.go +++ b/pkg/notifications/notifications.go @@ -2,80 +2,119 @@ package notifications import ( "context" - "database/sql" "fmt" "net/url" "time" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" - "github.com/icinga/icinga-go-library/notifications" "github.com/icinga/icinga-go-library/notifications/event" + "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-go-library/utils" "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/icingadb/history" v1history "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "slices" + "strings" ) -// Source is an Icinga Notifications compatible source implementation to push events to Icinga Notifications. -// -// A new Source should be created by the NewNotificationsSource function. New history entries can be submitted by -// calling the Source.Submit method. The Source will then process the history entries in a background worker goroutine. -type Source struct { - notifications.Config +// submission of a [database.Entity] to the Client. +type submission struct { + entity database.Entity + traces map[string]time.Time +} - inputCh chan database.Entity // inputCh is a buffered channel used to submit history entries to the worker. +// MarshalLogObject implements [zapcore.ObjectMarshaler] to print a debug trace. +func (sub submission) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", fmt.Sprintf("%T", sub.entity)) + + if len(sub.traces) < 1 { + return nil + } + + tracesKeys := slices.SortedFunc(func(yield func(string) bool) { + for key := range sub.traces { + if !yield(key) { + return + } + } + }, func(a string, b string) int { + return sub.traces[a].Compare(sub.traces[b]) + }) + + relTraces := make([]string, 0, len(tracesKeys)-1) + for i := 1; i < len(tracesKeys); i++ { + relTraces = append(relTraces, fmt.Sprintf("%s: %v", + tracesKeys[i], + sub.traces[tracesKeys[i]].Sub(sub.traces[tracesKeys[i-1]]))) + } + + encoder.AddDuration("processing_time", sub.traces[tracesKeys[len(tracesKeys)-1]].Sub(sub.traces[tracesKeys[0]])) + encoder.AddString("trace", strings.Join(relTraces, ", ")) + + return nil +} + +// Client is an Icinga Notifications compatible client implementation to push events to Icinga Notifications. +// +// A new Client should be created by the NewNotificationsClient function. New history entries can be submitted by +// calling the Source.Submit method. The Client will then process the history entries in a background worker goroutine. +type Client struct { + source.Config + + inputCh chan submission // inputCh is a buffered channel used to submit history entries to the worker. db *database.DB logger *logging.Logger - rules *notifications.SourceRulesInfo // rules holds the latest rules fetched from Icinga Notifications. + rules *source.RulesInfo // rules holds the latest rules fetched from Icinga Notifications. ctx context.Context ctxCancel context.CancelFunc - notificationsClient *notifications.Client // The Icinga Notifications client used to interact with the API. - redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. + notificationsClient *source.Client // The Icinga Notifications client used to interact with the API. + redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. } -// NewNotificationsSource creates a new Source connected to an existing database and logger. +// NewNotificationsClient creates a new Client connected to an existing database and logger. // // This function starts a worker goroutine in the background which can be stopped by ending the provided context. -func NewNotificationsSource( +func NewNotificationsClient( ctx context.Context, db *database.DB, rc *redis.Client, logger *logging.Logger, - cfg notifications.Config, -) *Source { + cfg source.Config, +) *Client { ctx, ctxCancel := context.WithCancel(ctx) - source := &Source{ + client := &Client{ Config: cfg, - inputCh: make(chan database.Entity, 1<<10), // chosen by fair dice roll + inputCh: make(chan submission, 1<<10), // chosen by fair dice roll db: db, logger: logger, - rules: ¬ifications.SourceRulesInfo{Version: notifications.EmptyRulesVersion}, + rules: &source.RulesInfo{Version: source.EmptyRulesVersion}, redisClient: rc, ctx: ctx, ctxCancel: ctxCancel, } - client, err := notifications.NewClient(source.Config, "Icinga DB") + notificationsClient, err := source.NewClient(client.Config, "Icinga DB") if err != nil { logger.Fatalw("Cannot create Icinga Notifications client", zap.Error(err)) } - source.notificationsClient = client + client.notificationsClient = notificationsClient - go source.worker() + go client.worker() - return source + return client } // evaluateRulesForObject returns the rule IDs for each matching query. @@ -93,7 +132,7 @@ func NewNotificationsSource( // > select * from host where id = :host_id and environment_id = :environment_id and name like 'prefix_%' // // The :host_id and :environment_id parameters will be bound to the entity's ID and EnvironmentId fields, respectively. -func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) { +func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) { outRuleIds := make([]int64, 0, len(s.rules.Rules)) for rule := range s.rules.Iter() { @@ -102,35 +141,33 @@ func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Ent continue } - run := func() error { + evaluates, err := func() (bool, error) { // The raw SQL query in the database is URL-encoded (mostly the space character is replaced by %20). // So, we need to unescape it before passing it to the database. query, err := url.QueryUnescape(rule.ObjectFilterExpr) if err != nil { - return errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr) + return false, errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr) } rows, err := s.db.NamedQueryContext(ctx, s.db.Rebind(query), entity) if err != nil { - return err + return false, err } defer func() { _ = rows.Close() }() if !rows.Next() { - return sql.ErrNoRows + return false, nil } - return nil - } - - if err := run(); err == nil { - outRuleIds = append(outRuleIds, rule.Id) - } else if errors.Is(err, sql.ErrNoRows) { - continue - } else { + return true, nil + }() + if err != nil { return nil, errors.Wrapf(err, "cannot fetch rule %d from %q", rule.Id, rule.ObjectFilterExpr) + } else if !evaluates { + continue } + outRuleIds = append(outRuleIds, rule.Id) } - return outRuleIds[:len(outRuleIds):len(outRuleIds)], nil + return outRuleIds, nil } // buildCommonEvent creates an event.Event based on Host and (optional) Service names. @@ -138,38 +175,38 @@ func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Ent // This function is used by all event builders to create a common event structure that includes the host and service // names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event. // Any event type-specific information (like severity, message, etc.) is added by the specific event builders. -func (s *Source) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { +func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { var ( - eventName string - eventUrl *url.URL - eventTags map[string]string + objectName string + objectUrl *url.URL + objectTags map[string]string ) if rlr.ServiceName != "" { - eventName = rlr.HostName + "!" + rlr.ServiceName + objectName = rlr.HostName + "!" + rlr.ServiceName - eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") - eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) + objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") + objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) - eventTags = map[string]string{ + objectTags = map[string]string{ "host": rlr.HostName, "service": rlr.ServiceName, } } else { - eventName = rlr.HostName + objectName = rlr.HostName - eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") - eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) + objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") + objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) - eventTags = map[string]string{ + objectTags = map[string]string{ "host": rlr.HostName, } } return &event.Event{ - Name: eventName, - URL: eventUrl.String(), - Tags: eventTags, + Name: objectName, + URL: objectUrl.String(), + Tags: objectTags, }, nil } @@ -177,7 +214,7 @@ func (s *Source) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) // // The resulted event will have all the necessary information for a state change event, and must // not be further modified by the caller. -func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { +func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -224,8 +261,8 @@ func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH return ev, nil } -// buildDowntimeHistoryEvent from a downtime history entry. -func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.DowntimeHistory) (*event.Event, error) { +// buildDowntimeHistoryMetaEvent from a downtime history entry. +func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -236,29 +273,36 @@ func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.Dow return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } - if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { - ev.Type = event.TypeDowntimeRemoved - ev.Message = "Downtime was cancelled" - - if h.CancelledBy.Valid { - ev.Username = h.CancelledBy.String - } - } else if h.EndTime.Time().Compare(time.Now()) <= 0 { - ev.Type = event.TypeDowntimeEnd - ev.Message = "Downtime expired" - } else { + switch h.EventType { + case "downtime_start": ev.Type = event.TypeDowntimeStart ev.Username = h.Author ev.Message = h.Comment ev.Mute = types.MakeBool(true) ev.MuteReason = "Checkable is in downtime" + + case "downtime_end": + if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { + ev.Type = event.TypeDowntimeRemoved + ev.Message = "Downtime was cancelled" + + if h.CancelledBy.Valid { + ev.Username = h.CancelledBy.String + } + } else { + ev.Type = event.TypeDowntimeEnd + ev.Message = "Downtime expired" + } + + default: + return nil, fmt.Errorf("unexpected event type %q", h.EventType) } return ev, nil } // buildFlappingHistoryEvent from a flapping history entry. -func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { +func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -289,7 +333,7 @@ func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla } // buildAcknowledgementHistoryEvent from an acknowledgment history entry. -func (s *Source) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { +func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -326,8 +370,8 @@ func (s *Source) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1hist return ev, nil } -// worker is the background worker launched by NewNotificationsSource. -func (s *Source) worker() { +// worker is the background worker launched by NewNotificationsClient. +func (s *Client) worker() { defer s.ctxCancel() for { @@ -335,55 +379,50 @@ func (s *Source) worker() { case <-s.ctx.Done(): return - case entity, more := <-s.inputCh: + case sub, more := <-s.inputCh: if !more { // Should never happen, but just in case. s.logger.Debug("Input channel closed, stopping worker") return } + sub.traces["worker_start"] = time.Now() + var ev *event.Event var eventErr error // Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go - switch h := entity.(type) { - case *v1history.NotificationHistory: - // Ignore for the moment. - continue + switch h := sub.entity.(type) { + case *v1history.AcknowledgementHistory: + ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h) + + case *v1history.DowntimeHistoryMeta: + ev, eventErr = s.buildDowntimeHistoryMetaEvent(s.ctx, h) + + case *v1history.FlappingHistory: + ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h) case *v1history.StateHistory: if h.StateType != common.HardState { continue } - ev, eventErr = s.buildStateHistoryEvent(s.ctx, h) - case *v1history.DowntimeHistory: - ev, eventErr = s.buildDowntimeHistoryEvent(s.ctx, h) - - case *v1history.CommentHistory: - // Ignore for the moment. - continue - - case *v1history.FlappingHistory: - ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h) - - case *v1history.AcknowledgementHistory: - ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h) - default: s.logger.Error("Cannot process unsupported type", + zap.Object("submission", sub), zap.String("type", fmt.Sprintf("%T", h))) continue } if eventErr != nil { s.logger.Errorw("Cannot build event from history entry", - zap.String("type", fmt.Sprintf("%T", entity)), + zap.Object("submission", sub), + zap.String("type", fmt.Sprintf("%T", sub.entity)), zap.Error(eventErr)) continue - } - if ev == nil { - s.logger.Error("No event was fetched, but no error was reported. This REALLY SHOULD NOT happen.") + } else if ev == nil { + // This really should not happen. + s.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub)) continue } @@ -396,52 +435,77 @@ func (s *Source) worker() { }), )) + sub.traces["evaluate_jump_pre"] = time.Now() reevaluateRules: - eventRuleIds, err := s.evaluateRulesForObject(s.ctx, entity) + sub.traces["evaluate_jump_last"] = time.Now() + eventRuleIds, err := s.evaluateRulesForObject(s.ctx, sub.entity) if err != nil { - eventLogger.Errorw("Cannot evaluate rules for event", zap.Error(err)) + eventLogger.Errorw("Cannot evaluate rules for event", + zap.Object("submission", sub), + zap.Error(err)) continue } + sub.traces["process_last"] = time.Now() newEventRules, err := s.notificationsClient.ProcessEvent(s.ctx, ev, s.rules.Version, eventRuleIds...) - if errors.Is(err, notifications.ErrRulesOutdated) { + if errors.Is(err, source.ErrRulesOutdated) { s.rules = newEventRules - eventLogger.Debugw("Re-evaluating rules for event after fetching new rules", zap.String("rules_version", s.rules.Version)) + eventLogger.Infow("Re-evaluating rules for event after fetching new rules", + zap.Object("submission", sub), + zap.String("rules_version", s.rules.Version)) // Re-evaluate the just fetched rules for the current event. goto reevaluateRules } else if err != nil { eventLogger.Errorw("Cannot submit event to Icinga Notifications", + zap.Object("submission", sub), zap.String("rules_version", s.rules.Version), zap.Any("rules", eventRuleIds), zap.Error(err)) continue } - eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds)) + sub.traces["worker_fin"] = time.Now() + eventLogger.Debugw("Successfully submitted event to Icinga Notifications", + zap.Object("submission", sub), + zap.Any("rules", eventRuleIds)) } } } -// Submit a history entry to be processed by the Source's internal worker loop. +// Submit a history entry to be processed by the Client's internal worker loop. // // Internally, a buffered channel is used for delivery. So this function should not block. Otherwise, it will abort // after a second and an error is logged. -func (s *Source) Submit(entity database.Entity) { +func (s *Client) Submit(entity database.Entity) { + sub := submission{ + entity: entity, + traces: map[string]time.Time{ + "submit": time.Now(), + }, + } + select { case <-s.ctx.Done(): - s.logger.Errorw("Source context is done, rejecting submission", - zap.String("submission", fmt.Sprintf("%+v", entity)), + s.logger.Errorw("Client context is done, rejecting submission", + zap.Object("submission", sub), zap.Error(s.ctx.Err())) return - case s.inputCh <- entity: + case s.inputCh <- sub: return case <-time.After(time.Second): - s.logger.Error("Source submission channel is blocking, rejecting submission", - zap.String("submission", fmt.Sprintf("%+v", entity))) + s.logger.Error("Client submission channel is blocking, rejecting submission", + zap.Object("submission", sub)) return } } + +var SyncKeyStructPtrs = map[string]any{ + history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil), + history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil), + history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil), + history.SyncPipelineState: (*v1history.StateHistory)(nil), +} diff --git a/pkg/notifications/redis_fetch.go b/pkg/notifications/redis_fetch.go index 8d61d776..7d7ec61d 100644 --- a/pkg/notifications/redis_fetch.go +++ b/pkg/notifications/redis_fetch.go @@ -23,7 +23,7 @@ import ( // If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the // request and return an error indicating that the operation timed out. In case of the serviceId being set, the // maximum execution time of the Redis HGet commands is 10s (5s for each HGet call). -func (s *Source) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { +func (s *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { redisHGet := func(typ, field string, out *redisLookupResult) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -69,14 +69,14 @@ type redisLookupResult struct { ServiceName string `json:"-"` // Name of the service (only set in service context). // Name is used to retrieve the host or service name from Redis. - // It should not be used for any other purpose apart from within the [Source.fetchHostServiceName] function. + // It should not be used for any other purpose apart from within the [Client.fetchHostServiceName] function. Name string `json:"name"` } // UnmarshalBinary implements the [encoding.BinaryUnmarshaler] interface for redisLookupResult. // // It unmarshals the binary data of the Redis HGet result into the redisLookupResult struct. -// This is required for the HGet().Scan() usage in the [Source.fetchHostServiceName] function to work correctly. +// This is required for the HGet().Scan() usage in the [Client.fetchHostServiceName] function to work correctly. func (rlr *redisLookupResult) UnmarshalBinary(data []byte) error { if len(data) == 0 { return errors.New("empty data received for redisLookupResult")