diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index c7fd403c..27cb58f2 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -170,23 +170,26 @@ func run() int { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) - var notificationsSourceCallback func(database.Entity) - if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { - logger.Info("Starting Icinga Notifications source") - - notificationsSource := notifications.NewNotificationsSource( - ctx, - db, - rc, - logs.GetChildLogger("notifications-source"), - cfg) - notificationsSourceCallback = notificationsSource.Submit - } - go func() { + var callback func(database.Entity) + var callbackKeyStructPtr map[string]any + + if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { + logger.Info("Starting Icinga Notifications source") + + notificationsSource := notifications.NewNotificationsClient( + ctx, + db, + rc, + logs.GetChildLogger("notifications-source"), + cfg) + callback = notificationsSource.Submit + callbackKeyStructPtr = notifications.SyncKeyStructPtrs + } + logger.Info("Starting history sync") - if err := hs.Sync(ctx, notificationsSourceCallback); err != nil && !utils.IsContextCanceled(err) { + if err := hs.Sync(ctx, callbackKeyStructPtr, callback); err != nil && !utils.IsContextCanceled(err) { logger.Fatalf("%+v", err) } }() diff --git a/internal/config/config.go b/internal/config/config.go index 618a85fa..359cf052 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,7 +4,7 @@ import ( "github.com/creasty/defaults" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" - "github.com/icinga/icinga-go-library/notifications" + "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icingadb/pkg/icingadb/history" "github.com/pkg/errors" @@ -16,11 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml" // Config defines Icinga DB config. type Config struct { - Database database.Config `yaml:"database" envPrefix:"DATABASE_"` - Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` - Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` - Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` - NotificationsSource notifications.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"` + Database database.Config `yaml:"database" envPrefix:"DATABASE_"` + Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` + Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` + Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` + NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"` } func (c *Config) SetDefaults() { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 30c7f5a2..ac1c28e6 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -39,9 +39,15 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. // -// If not nil, the callback function is appended to each synchronization pipeline and called before the entry is deleted -// from Redis. -func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error { +// An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil. +// +// The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If +// a key is missing from the map, it will not be used for the callback. The callback function itself shall not block. +func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, callback func(database.Entity)) error { + if (callbackKeyStructPtr == nil) != (callback == nil) { + return fmt.Errorf("either both callbackKeyStructPtr and callback must be nil or none") + } + g, ctx := errgroup.WithContext(ctx) for key, pipeline := range syncPipelines { @@ -67,8 +73,13 @@ func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error { // forward the entry after it has completed its own sync so that later stages can rely on previous stages being // executed successfully. - if callback != nil { - pipeline = append(pipeline, makeCallbackStageFunc(callback)) + // Shadowed variable to allow appending custom callbacks. + pipeline := pipeline + if callbackKeyStructPtr != nil { + _, ok := callbackKeyStructPtr[key] + if ok { + pipeline = append(pipeline, makeCallbackStageFunc(callbackKeyStructPtr, callback)) + } } ch := make([]chan redis.XMessage, len(pipeline)+1) @@ -371,28 +382,17 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re // makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message. // +// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key. +// // The callback call is blocking and the message will be forwarded to the out channel after the function has returned. // Thus, please ensure this function does not block too long. -func makeCallbackStageFunc(callback func(database.Entity)) stageFunc { +func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.Entity)) stageFunc { return func(ctx context.Context, _ Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { defer close(out) - var structPtr database.Entity - switch key { // keep in sync with syncPipelines below - case "notification": - structPtr = (*v1.NotificationHistory)(nil) - case "state": - structPtr = (*v1.StateHistory)(nil) - case "downtime": - structPtr = (*v1.DowntimeHistory)(nil) - case "comment": - structPtr = (*v1.CommentHistory)(nil) - case "flapping": - structPtr = (*v1.FlappingHistory)(nil) - case "acknowledgement": - structPtr = (*v1.AcknowledgementHistory)(nil) - default: - return fmt.Errorf("unsupported key %q", key) + structPtr, ok := keyStructPtrs[key] + if !ok { + return fmt.Errorf("can't lookup struct pointer for key %q", key) } structifier := structify.MakeMapStructifier( @@ -409,7 +409,7 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc { val, err := structifier(msg.Values) if err != nil { - return errors.Wrapf(err, "can't structify values %#v for %s", msg.Values, key) + return errors.Wrapf(err, "can't structify values %#v for %q", msg.Values, key) } entity, ok := val.(database.Entity) @@ -427,32 +427,41 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc { } } +const ( + SyncPipelineAcknowledgement = "acknowledgement" + SyncPipelineComment = "comment" + SyncPipelineDowntime = "downtime" + SyncPipelineFlapping = "flapping" + SyncPipelineNotification = "notification" + SyncPipelineState = "state" +) + var syncPipelines = map[string][]stageFunc{ - "notification": { - writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history - userNotificationStage, // user_notification_history (depends on notification_history) - writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + SyncPipelineAcknowledgement: { + writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) }, - "state": { - writeOneEntityStage((*v1.StateHistory)(nil)), // state_history - writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) - writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + SyncPipelineComment: { + writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history + writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) }, - "downtime": { + SyncPipelineDowntime: { writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime }, - "comment": { - writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history - writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) - }, - "flapping": { + SyncPipelineFlapping: { writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) }, - "acknowledgement": { - writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history - writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + SyncPipelineNotification: { + writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history + userNotificationStage, // user_notification_history (depends on notification_history) + writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + }, + SyncPipelineState: { + writeOneEntityStage((*v1.StateHistory)(nil)), // state_history + writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) + writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state }, } diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go index 969cd472..bbbfbbc1 100644 --- a/pkg/icingadb/v1/history/downtime.go +++ b/pkg/icingadb/v1/history/downtime.go @@ -88,6 +88,12 @@ func (*HistoryDowntime) TableName() string { return "history" } +type DowntimeHistoryMeta struct { + DowntimeHistoryEntity `json:",inline"` + DowntimeHistory `json:",inline"` + HistoryMeta `json:",inline"` +} + type SlaHistoryDowntime struct { DowntimeHistoryEntity `json:",inline"` HistoryTableMeta `json:",inline"` diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go index 5a469ece..1a0ddd23 100644 --- a/pkg/notifications/notifications.go +++ b/pkg/notifications/notifications.go @@ -2,80 +2,119 @@ package notifications import ( "context" - "database/sql" "fmt" "net/url" "time" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" - "github.com/icinga/icinga-go-library/notifications" "github.com/icinga/icinga-go-library/notifications/event" + "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-go-library/utils" "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/icingadb/history" v1history "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "slices" + "strings" ) -// Source is an Icinga Notifications compatible source implementation to push events to Icinga Notifications. -// -// A new Source should be created by the NewNotificationsSource function. New history entries can be submitted by -// calling the Source.Submit method. The Source will then process the history entries in a background worker goroutine. -type Source struct { - notifications.Config +// submission of a [database.Entity] to the Client. +type submission struct { + entity database.Entity + traces map[string]time.Time +} - inputCh chan database.Entity // inputCh is a buffered channel used to submit history entries to the worker. +// MarshalLogObject implements [zapcore.ObjectMarshaler] to print a debug trace. +func (sub submission) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", fmt.Sprintf("%T", sub.entity)) + + if len(sub.traces) < 1 { + return nil + } + + tracesKeys := slices.SortedFunc(func(yield func(string) bool) { + for key := range sub.traces { + if !yield(key) { + return + } + } + }, func(a string, b string) int { + return sub.traces[a].Compare(sub.traces[b]) + }) + + relTraces := make([]string, 0, len(tracesKeys)-1) + for i := 1; i < len(tracesKeys); i++ { + relTraces = append(relTraces, fmt.Sprintf("%s: %v", + tracesKeys[i], + sub.traces[tracesKeys[i]].Sub(sub.traces[tracesKeys[i-1]]))) + } + + encoder.AddDuration("processing_time", sub.traces[tracesKeys[len(tracesKeys)-1]].Sub(sub.traces[tracesKeys[0]])) + encoder.AddString("trace", strings.Join(relTraces, ", ")) + + return nil +} + +// Client is an Icinga Notifications compatible client implementation to push events to Icinga Notifications. +// +// A new Client should be created by the NewNotificationsClient function. New history entries can be submitted by +// calling the Source.Submit method. The Client will then process the history entries in a background worker goroutine. +type Client struct { + source.Config + + inputCh chan submission // inputCh is a buffered channel used to submit history entries to the worker. db *database.DB logger *logging.Logger - rules *notifications.SourceRulesInfo // rules holds the latest rules fetched from Icinga Notifications. + rules *source.RulesInfo // rules holds the latest rules fetched from Icinga Notifications. ctx context.Context ctxCancel context.CancelFunc - notificationsClient *notifications.Client // The Icinga Notifications client used to interact with the API. - redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. + notificationsClient *source.Client // The Icinga Notifications client used to interact with the API. + redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. } -// NewNotificationsSource creates a new Source connected to an existing database and logger. +// NewNotificationsClient creates a new Client connected to an existing database and logger. // // This function starts a worker goroutine in the background which can be stopped by ending the provided context. -func NewNotificationsSource( +func NewNotificationsClient( ctx context.Context, db *database.DB, rc *redis.Client, logger *logging.Logger, - cfg notifications.Config, -) *Source { + cfg source.Config, +) *Client { ctx, ctxCancel := context.WithCancel(ctx) - source := &Source{ + client := &Client{ Config: cfg, - inputCh: make(chan database.Entity, 1<<10), // chosen by fair dice roll + inputCh: make(chan submission, 1<<10), // chosen by fair dice roll db: db, logger: logger, - rules: ¬ifications.SourceRulesInfo{Version: notifications.EmptyRulesVersion}, + rules: &source.RulesInfo{Version: source.EmptyRulesVersion}, redisClient: rc, ctx: ctx, ctxCancel: ctxCancel, } - client, err := notifications.NewClient(source.Config, "Icinga DB") + notificationsClient, err := source.NewClient(client.Config, "Icinga DB") if err != nil { logger.Fatalw("Cannot create Icinga Notifications client", zap.Error(err)) } - source.notificationsClient = client + client.notificationsClient = notificationsClient - go source.worker() + go client.worker() - return source + return client } // evaluateRulesForObject returns the rule IDs for each matching query. @@ -93,7 +132,7 @@ func NewNotificationsSource( // > select * from host where id = :host_id and environment_id = :environment_id and name like 'prefix_%' // // The :host_id and :environment_id parameters will be bound to the entity's ID and EnvironmentId fields, respectively. -func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) { +func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) { outRuleIds := make([]int64, 0, len(s.rules.Rules)) for rule := range s.rules.Iter() { @@ -102,35 +141,33 @@ func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Ent continue } - run := func() error { + evaluates, err := func() (bool, error) { // The raw SQL query in the database is URL-encoded (mostly the space character is replaced by %20). // So, we need to unescape it before passing it to the database. query, err := url.QueryUnescape(rule.ObjectFilterExpr) if err != nil { - return errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr) + return false, errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr) } rows, err := s.db.NamedQueryContext(ctx, s.db.Rebind(query), entity) if err != nil { - return err + return false, err } defer func() { _ = rows.Close() }() if !rows.Next() { - return sql.ErrNoRows + return false, nil } - return nil - } - - if err := run(); err == nil { - outRuleIds = append(outRuleIds, rule.Id) - } else if errors.Is(err, sql.ErrNoRows) { - continue - } else { + return true, nil + }() + if err != nil { return nil, errors.Wrapf(err, "cannot fetch rule %d from %q", rule.Id, rule.ObjectFilterExpr) + } else if !evaluates { + continue } + outRuleIds = append(outRuleIds, rule.Id) } - return outRuleIds[:len(outRuleIds):len(outRuleIds)], nil + return outRuleIds, nil } // buildCommonEvent creates an event.Event based on Host and (optional) Service names. @@ -138,38 +175,38 @@ func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Ent // This function is used by all event builders to create a common event structure that includes the host and service // names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event. // Any event type-specific information (like severity, message, etc.) is added by the specific event builders. -func (s *Source) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { +func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { var ( - eventName string - eventUrl *url.URL - eventTags map[string]string + objectName string + objectUrl *url.URL + objectTags map[string]string ) if rlr.ServiceName != "" { - eventName = rlr.HostName + "!" + rlr.ServiceName + objectName = rlr.HostName + "!" + rlr.ServiceName - eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") - eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) + objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") + objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) - eventTags = map[string]string{ + objectTags = map[string]string{ "host": rlr.HostName, "service": rlr.ServiceName, } } else { - eventName = rlr.HostName + objectName = rlr.HostName - eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") - eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) + objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") + objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) - eventTags = map[string]string{ + objectTags = map[string]string{ "host": rlr.HostName, } } return &event.Event{ - Name: eventName, - URL: eventUrl.String(), - Tags: eventTags, + Name: objectName, + URL: objectUrl.String(), + Tags: objectTags, }, nil } @@ -177,7 +214,7 @@ func (s *Source) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) // // The resulted event will have all the necessary information for a state change event, and must // not be further modified by the caller. -func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { +func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -224,8 +261,8 @@ func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH return ev, nil } -// buildDowntimeHistoryEvent from a downtime history entry. -func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.DowntimeHistory) (*event.Event, error) { +// buildDowntimeHistoryMetaEvent from a downtime history entry. +func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -236,29 +273,36 @@ func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.Dow return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } - if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { - ev.Type = event.TypeDowntimeRemoved - ev.Message = "Downtime was cancelled" - - if h.CancelledBy.Valid { - ev.Username = h.CancelledBy.String - } - } else if h.EndTime.Time().Compare(time.Now()) <= 0 { - ev.Type = event.TypeDowntimeEnd - ev.Message = "Downtime expired" - } else { + switch h.EventType { + case "downtime_start": ev.Type = event.TypeDowntimeStart ev.Username = h.Author ev.Message = h.Comment ev.Mute = types.MakeBool(true) ev.MuteReason = "Checkable is in downtime" + + case "downtime_end": + if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { + ev.Type = event.TypeDowntimeRemoved + ev.Message = "Downtime was cancelled" + + if h.CancelledBy.Valid { + ev.Username = h.CancelledBy.String + } + } else { + ev.Type = event.TypeDowntimeEnd + ev.Message = "Downtime expired" + } + + default: + return nil, fmt.Errorf("unexpected event type %q", h.EventType) } return ev, nil } // buildFlappingHistoryEvent from a flapping history entry. -func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { +func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -289,7 +333,7 @@ func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla } // buildAcknowledgementHistoryEvent from an acknowledgment history entry. -func (s *Source) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { +func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { return nil, err @@ -326,8 +370,8 @@ func (s *Source) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1hist return ev, nil } -// worker is the background worker launched by NewNotificationsSource. -func (s *Source) worker() { +// worker is the background worker launched by NewNotificationsClient. +func (s *Client) worker() { defer s.ctxCancel() for { @@ -335,55 +379,50 @@ func (s *Source) worker() { case <-s.ctx.Done(): return - case entity, more := <-s.inputCh: + case sub, more := <-s.inputCh: if !more { // Should never happen, but just in case. s.logger.Debug("Input channel closed, stopping worker") return } + sub.traces["worker_start"] = time.Now() + var ev *event.Event var eventErr error // Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go - switch h := entity.(type) { - case *v1history.NotificationHistory: - // Ignore for the moment. - continue + switch h := sub.entity.(type) { + case *v1history.AcknowledgementHistory: + ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h) + + case *v1history.DowntimeHistoryMeta: + ev, eventErr = s.buildDowntimeHistoryMetaEvent(s.ctx, h) + + case *v1history.FlappingHistory: + ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h) case *v1history.StateHistory: if h.StateType != common.HardState { continue } - ev, eventErr = s.buildStateHistoryEvent(s.ctx, h) - case *v1history.DowntimeHistory: - ev, eventErr = s.buildDowntimeHistoryEvent(s.ctx, h) - - case *v1history.CommentHistory: - // Ignore for the moment. - continue - - case *v1history.FlappingHistory: - ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h) - - case *v1history.AcknowledgementHistory: - ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h) - default: s.logger.Error("Cannot process unsupported type", + zap.Object("submission", sub), zap.String("type", fmt.Sprintf("%T", h))) continue } if eventErr != nil { s.logger.Errorw("Cannot build event from history entry", - zap.String("type", fmt.Sprintf("%T", entity)), + zap.Object("submission", sub), + zap.String("type", fmt.Sprintf("%T", sub.entity)), zap.Error(eventErr)) continue - } - if ev == nil { - s.logger.Error("No event was fetched, but no error was reported. This REALLY SHOULD NOT happen.") + } else if ev == nil { + // This really should not happen. + s.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub)) continue } @@ -396,52 +435,77 @@ func (s *Source) worker() { }), )) + sub.traces["evaluate_jump_pre"] = time.Now() reevaluateRules: - eventRuleIds, err := s.evaluateRulesForObject(s.ctx, entity) + sub.traces["evaluate_jump_last"] = time.Now() + eventRuleIds, err := s.evaluateRulesForObject(s.ctx, sub.entity) if err != nil { - eventLogger.Errorw("Cannot evaluate rules for event", zap.Error(err)) + eventLogger.Errorw("Cannot evaluate rules for event", + zap.Object("submission", sub), + zap.Error(err)) continue } + sub.traces["process_last"] = time.Now() newEventRules, err := s.notificationsClient.ProcessEvent(s.ctx, ev, s.rules.Version, eventRuleIds...) - if errors.Is(err, notifications.ErrRulesOutdated) { + if errors.Is(err, source.ErrRulesOutdated) { s.rules = newEventRules - eventLogger.Debugw("Re-evaluating rules for event after fetching new rules", zap.String("rules_version", s.rules.Version)) + eventLogger.Infow("Re-evaluating rules for event after fetching new rules", + zap.Object("submission", sub), + zap.String("rules_version", s.rules.Version)) // Re-evaluate the just fetched rules for the current event. goto reevaluateRules } else if err != nil { eventLogger.Errorw("Cannot submit event to Icinga Notifications", + zap.Object("submission", sub), zap.String("rules_version", s.rules.Version), zap.Any("rules", eventRuleIds), zap.Error(err)) continue } - eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds)) + sub.traces["worker_fin"] = time.Now() + eventLogger.Debugw("Successfully submitted event to Icinga Notifications", + zap.Object("submission", sub), + zap.Any("rules", eventRuleIds)) } } } -// Submit a history entry to be processed by the Source's internal worker loop. +// Submit a history entry to be processed by the Client's internal worker loop. // // Internally, a buffered channel is used for delivery. So this function should not block. Otherwise, it will abort // after a second and an error is logged. -func (s *Source) Submit(entity database.Entity) { +func (s *Client) Submit(entity database.Entity) { + sub := submission{ + entity: entity, + traces: map[string]time.Time{ + "submit": time.Now(), + }, + } + select { case <-s.ctx.Done(): - s.logger.Errorw("Source context is done, rejecting submission", - zap.String("submission", fmt.Sprintf("%+v", entity)), + s.logger.Errorw("Client context is done, rejecting submission", + zap.Object("submission", sub), zap.Error(s.ctx.Err())) return - case s.inputCh <- entity: + case s.inputCh <- sub: return case <-time.After(time.Second): - s.logger.Error("Source submission channel is blocking, rejecting submission", - zap.String("submission", fmt.Sprintf("%+v", entity))) + s.logger.Error("Client submission channel is blocking, rejecting submission", + zap.Object("submission", sub)) return } } + +var SyncKeyStructPtrs = map[string]any{ + history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil), + history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil), + history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil), + history.SyncPipelineState: (*v1history.StateHistory)(nil), +} diff --git a/pkg/notifications/redis_fetch.go b/pkg/notifications/redis_fetch.go index 8d61d776..7d7ec61d 100644 --- a/pkg/notifications/redis_fetch.go +++ b/pkg/notifications/redis_fetch.go @@ -23,7 +23,7 @@ import ( // If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the // request and return an error indicating that the operation timed out. In case of the serviceId being set, the // maximum execution time of the Redis HGet commands is 10s (5s for each HGet call). -func (s *Source) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { +func (s *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { redisHGet := func(typ, field string, out *redisLookupResult) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -69,14 +69,14 @@ type redisLookupResult struct { ServiceName string `json:"-"` // Name of the service (only set in service context). // Name is used to retrieve the host or service name from Redis. - // It should not be used for any other purpose apart from within the [Source.fetchHostServiceName] function. + // It should not be used for any other purpose apart from within the [Client.fetchHostServiceName] function. Name string `json:"name"` } // UnmarshalBinary implements the [encoding.BinaryUnmarshaler] interface for redisLookupResult. // // It unmarshals the binary data of the Redis HGet result into the redisLookupResult struct. -// This is required for the HGet().Scan() usage in the [Source.fetchHostServiceName] function to work correctly. +// This is required for the HGet().Scan() usage in the [Client.fetchHostServiceName] function to work correctly. func (rlr *redisLookupResult) UnmarshalBinary(data []byte) error { if len(data) == 0 { return errors.New("empty data received for redisLookupResult")