diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index c47cea83..aca62031 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -170,7 +170,7 @@ func run() int { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) - go func() { + { var ( callbackName string callbackKeyStructPtr map[string]any @@ -180,29 +180,34 @@ func run() int { if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { logger.Info("Starting Icinga Notifications source") - notificationsSource := notifications.NewNotificationsClient( + notificationsSource, err := notifications.NewNotificationsClient( ctx, db, rc, logs.GetChildLogger("notifications-source"), cfg) + if err != nil { + logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err)) + } callbackName = "notifications_sync" callbackKeyStructPtr = notifications.SyncKeyStructPtrs callbackFn = notificationsSource.Submit } - logger.Info("Starting history sync") + go func() { + logger.Info("Starting history sync") - if err := hs.Sync( - ctx, - callbackName, - callbackKeyStructPtr, - callbackFn, - ); err != nil && !utils.IsContextCanceled(err) { - logger.Fatalf("%+v", err) - } - }() + if err := hs.Sync( + ctx, + callbackName, + callbackKeyStructPtr, + callbackFn, + ); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + }() + } // Main loop for { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 49fbfdf6..f9f62838 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" "reflect" + "slices" "sync" "time" ) @@ -97,7 +98,7 @@ func (s Sync) Sync( // Shadowed variable to allow appending custom callbacks. pipeline := pipeline if hasCallbackStage { - pipeline = append(pipeline, makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn)) + pipeline = append(slices.Clip(pipeline), makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn)) } ch := make([]chan redis.XMessage, len(pipeline)+1) @@ -470,7 +471,7 @@ func makeCallbackStageFunc( backlogLastId := "" backlogMsgCounter := 0 - const backlogTimerMinInterval, backlogTimerMaxInterval = 10 * time.Millisecond, time.Minute + const backlogTimerMinInterval, backlogTimerMaxInterval = time.Millisecond, time.Minute backlogTimerInterval := backlogTimerMinInterval backlogTimer := time.NewTimer(backlogTimerInterval) _ = backlogTimer.Stop() diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go index bbbfbbc1..e43f918d 100644 --- a/pkg/icingadb/v1/history/downtime.go +++ b/pkg/icingadb/v1/history/downtime.go @@ -88,6 +88,9 @@ func (*HistoryDowntime) TableName() string { return "history" } +// DowntimeHistoryMeta is a combined HistoryMeta struct of DowntimeHistoryEntity and DowntimeHistory. +// +// It is used in the notifications package and became necessary as values of both structs were required. type DowntimeHistoryMeta struct { DowntimeHistoryEntity `json:",inline"` DowntimeHistory `json:",inline"` diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go index 7ee4612c..25df8094 100644 --- a/pkg/notifications/notifications.go +++ b/pkg/notifications/notifications.go @@ -49,26 +49,25 @@ func NewNotificationsClient( rc *redis.Client, logger *logging.Logger, cfg source.Config, -) *Client { - client := &Client{ +) (*Client, error) { + notificationsClient, err := source.NewClient(cfg, "Icinga DB "+internal.Version.Version) + if err != nil { + return nil, err + } + + return &Client{ Config: cfg, db: db, logger: logger, - rulesInfo: &source.RulesInfo{}, - redisClient: rc, - ctx: ctx, - } - notificationsClient, err := source.NewClient(client.Config, fmt.Sprintf("Icinga DB %s", internal.Version.Version)) - if err != nil { - logger.Fatalw("Cannot create Icinga Notifications client", zap.Error(err)) - } - client.notificationsClient = notificationsClient + rulesInfo: &source.RulesInfo{}, - return client + notificationsClient: notificationsClient, + redisClient: rc, + }, nil } // evaluateRulesForObject returns the rule IDs for each matching query. @@ -124,34 +123,36 @@ func (client *Client) evaluateRulesForObject(ctx context.Context, entity databas return outRuleIds, nil } -// buildCommonEvent creates an event.Event based on Host and (optional) Service names. +// buildCommonEvent creates an event.Event based on Host and (optional) Service IDs. // // This function is used by all event builders to create a common event structure that includes the host and service -// names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event. +// names, an Icinga DB Web reference, and the tags for the event. // Any event type-specific information (like severity, message, etc.) is added by the specific event builders. -func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { +func (client *Client) buildCommonEvent( + ctx context.Context, + hostId, serviceId types.Binary, +) (*event.Event, *redisLookupResult, error) { + rlr, err := client.fetchHostServiceName(ctx, hostId, serviceId) + if err != nil { + return nil, nil, err + } + var ( objectName string - objectUrl url.URL + objectUrl string objectTags map[string]string ) if rlr.ServiceName != "" { objectName = rlr.HostName + "!" + rlr.ServiceName - - objectUrl.Path = "/icingadb/service" - objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) - + objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) objectTags = map[string]string{ "host": rlr.HostName, "service": rlr.ServiceName, } } else { objectName = rlr.HostName - - objectUrl.Path = "/icingadb/host" - objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) - + objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(rlr.HostName) objectTags = map[string]string{ "host": rlr.HostName, } @@ -159,9 +160,9 @@ func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, er return &event.Event{ Name: objectName, - URL: objectUrl.String(), + URL: objectUrl, Tags: objectTags, - }, nil + }, rlr, nil } // buildStateHistoryEvent builds a fully initialized event.Event from a state history entry. @@ -169,19 +170,14 @@ func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, er // The resulted event will have all the necessary information for a state change event, and must // not be further modified by the caller. func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { - res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) + ev, rlr, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, err - } - - ev, err := client.buildCommonEvent(res) - if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) } ev.Type = event.TypeState - if res.ServiceName != "" { + if rlr.ServiceName != "" { switch h.HardState { case 0: ev.Severity = event.SeverityOK @@ -217,14 +213,9 @@ func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.S // buildDowntimeHistoryMetaEvent from a downtime history entry. func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { - res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, err - } - - ev, err := client.buildCommonEvent(res) - if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) } switch h.EventType { @@ -236,6 +227,7 @@ func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1hi ev.MuteReason = "Checkable is in downtime" case "downtime_end": + ev.Mute = types.MakeBool(false) if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { ev.Type = event.TypeDowntimeRemoved ev.Message = "Downtime was cancelled" @@ -257,14 +249,9 @@ func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1hi // buildFlappingHistoryEvent from a flapping history entry. func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { - res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, err - } - - ev, err := client.buildCommonEvent(res) - if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) } if h.PercentStateChangeEnd.Valid { @@ -288,14 +275,9 @@ func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1histor // buildAcknowledgementHistoryEvent from an acknowledgment history entry. func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { - res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId) + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, err - } - - ev, err := client.buildCommonEvent(res) - if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) } if !h.ClearTime.Time().IsZero() { @@ -334,9 +316,6 @@ func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v // // This method usees the Client's logger. func (client *Client) Submit(entity database.Entity) bool { - client.submissionMutex.Lock() - defer client.submissionMutex.Unlock() - if client.ctx.Err() != nil { client.logger.Errorw("Cannot process submitted entity as client context is done", zap.Error(client.ctx.Err())) return true @@ -345,7 +324,7 @@ func (client *Client) Submit(entity database.Entity) bool { var ev *event.Event var eventErr error - // Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go + // Keep the type switch in sync with the values of SyncKeyStructPtrs below. switch h := entity.(type) { case *v1history.AcknowledgementHistory: ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h) @@ -388,36 +367,48 @@ func (client *Client) Submit(entity database.Entity) bool { }), )) - eventRuleIds, err := client.evaluateRulesForObject(client.ctx, entity) - if err != nil { - // While returning false would be more correct, this would result in never being able to refetch new rule - // versions. Consider an invalid object filter expression, which is now impossible to get rid of. - eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err)) - eventRuleIds = []string{} + // The following code accesses Client.rulesInfo. + client.submissionMutex.Lock() + defer client.submissionMutex.Unlock() + + // This loop allows resubmitting an event if the rules have changed. The first try would be the rule update, the + // second try would be the resubmit, and the third try would be for bad luck, e.g., when a second rule update just + // crept in between. If there are three subsequent rule updates, something is wrong. + for try := 0; try < 3; try++ { + eventRuleIds, err := client.evaluateRulesForObject(client.ctx, entity) + if err != nil { + // While returning false would be more correct, this would result in never being able to refetch new rule + // versions. Consider an invalid object filter expression, which is now impossible to get rid of. + eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err)) + eventRuleIds = []string{} + } + + ev.RulesVersion = client.rulesInfo.Version + ev.RuleIds = eventRuleIds + + newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev) + if errors.Is(err, source.ErrRulesOutdated) { + eventLogger.Infow("Received a rule update from Icinga Notification, resubmitting event", + zap.String("old_rules_version", client.rulesInfo.Version), + zap.String("new_rules_version", newEventRules.Version)) + + client.rulesInfo = newEventRules + + continue + } else if err != nil { + eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried", + zap.String("rules_version", client.rulesInfo.Version), + zap.Any("rules", eventRuleIds), + zap.Error(err)) + return false + } + + eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds)) + return true } - ev.RulesVersion = client.rulesInfo.Version - ev.RuleIds = eventRuleIds - - newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev) - if errors.Is(err, source.ErrRulesOutdated) { - eventLogger.Infow("Cannot submit event to Icinga Notifications due to rule changes, will be retried", - zap.String("old_rules_version", client.rulesInfo.Version), - zap.String("new_rules_version", newEventRules.Version)) - - client.rulesInfo = newEventRules - - return false - } else if err != nil { - eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried", - zap.String("rules_version", client.rulesInfo.Version), - zap.Any("rules", eventRuleIds), - zap.Error(err)) - return false - } - - eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds)) - return true + eventLogger.Error("Received three rule updates from Icinga Notifications in a row, event will be retried") + return false } var SyncKeyStructPtrs = map[string]any{ diff --git a/pkg/notifications/redis_fetch.go b/pkg/notifications/redis_fetch.go index 3cacde16..c221400d 100644 --- a/pkg/notifications/redis_fetch.go +++ b/pkg/notifications/redis_fetch.go @@ -24,40 +24,52 @@ import ( // request and return an error indicating that the operation timed out. In case of the serviceId being set, the // maximum execution time of the Redis HGet commands is 10s (5s for each HGet call). func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { - redisHGet := func(typ, field string, out *redisLookupResult) error { + getNameFromRedis := func(typ, id string) (string, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + var data string err := retry.WithBackoff( ctx, - func(ctx context.Context) error { return client.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) }, + func(ctx context.Context) (err error) { + data, err = client.redisClient.HGet(ctx, "icinga:"+typ, id).Result() + return + }, retry.Retryable, backoff.DefaultBackoff, retry.Settings{}, ) if err != nil { if errors.Is(err, redis.Nil) { - return fmt.Errorf("%s with ID %s not found in Redis", typ, hostId) + return "", fmt.Errorf("%s with ID %s not found in Redis", typ, hostId) } - return fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, field, err) + return "", fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, id, err) } - return nil + + var result struct { + Name string `json:"name"` + } + + if err := json.Unmarshal([]byte(data), &result); err != nil { + return "", fmt.Errorf("failed to unmarshal redis result: %w", err) + } + + return result.Name, nil } var result redisLookupResult - if err := redisHGet("host", hostId.String(), &result); err != nil { + var err error + + result.HostName, err = getNameFromRedis("host", hostId.String()) + if err != nil { return nil, err } - result.HostName = result.Name - result.Name = "" // Clear the name field for the host, as we will fetch the service name next. - if serviceId != nil { - if err := redisHGet("service", serviceId.String(), &result); err != nil { + result.ServiceName, err = getNameFromRedis("service", serviceId.String()) + if err != nil { return nil, err } - result.ServiceName = result.Name - result.Name = "" // It's not needed anymore, clear it! } return &result, nil @@ -67,23 +79,4 @@ func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceI type redisLookupResult struct { HostName string `json:"-"` // Name of the host (never empty). ServiceName string `json:"-"` // Name of the service (only set in service context). - - // Name is used to retrieve the host or service name from Redis. - // It should not be used for any other purpose apart from within the [Client.fetchHostServiceName] function. - Name string `json:"name"` -} - -// UnmarshalBinary implements the [encoding.BinaryUnmarshaler] interface for redisLookupResult. -// -// It unmarshals the binary data of the Redis HGet result into the redisLookupResult struct. -// This is required for the HGet().Scan() usage in the [Client.fetchHostServiceName] function to work correctly. -func (rlr *redisLookupResult) UnmarshalBinary(data []byte) error { - if len(data) == 0 { - return errors.New("empty data received for redisLookupResult") - } - - if err := json.Unmarshal(data, rlr); err != nil { - return fmt.Errorf("failed to unmarshal redis result: %w", err) - } - return nil }