diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index bd2a14ce..c7fd403c 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -177,6 +177,7 @@ func run() int { notificationsSource := notifications.NewNotificationsSource( ctx, db, + rc, logs.GetChildLogger("notifications-source"), cfg) notificationsSourceCallback = notificationsSource.Submit diff --git a/go.mod b/go.mod index e60ed366..a392bb42 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.32 github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.16.0 github.com/stretchr/testify v1.11.1 github.com/vbauerster/mpb/v6 v6.0.4 go.uber.org/zap v1.27.0 @@ -34,7 +35,6 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.12 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/redis/go-redis/v9 v9.16.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go index e43d70f4..41655881 100644 --- a/pkg/notifications/notifications.go +++ b/pkg/notifications/notifications.go @@ -8,12 +8,11 @@ import ( "sync" "time" - "github.com/icinga/icinga-go-library/backoff" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/notifications" "github.com/icinga/icinga-go-library/notifications/event" - "github.com/icinga/icinga-go-library/retry" + "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-go-library/utils" "github.com/icinga/icingadb/pkg/common" @@ -41,6 +40,7 @@ type Source struct { ctxCancel context.CancelFunc notificationsClient *notifications.Client // The Icinga Notifications client used to interact with the API. + redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. } // NewNotificationsSource creates a new Source connected to an existing database and logger. @@ -49,6 +49,7 @@ type Source struct { func NewNotificationsSource( ctx context.Context, db *database.DB, + rc *redis.Client, logger *logging.Logger, cfg notifications.Config, ) *Source { @@ -61,7 +62,8 @@ func NewNotificationsSource( db: db, logger: logger, - rules: ¬ifications.SourceRulesInfo{Version: notifications.EmptyRulesVersion}, + rules: ¬ifications.SourceRulesInfo{Version: notifications.EmptyRulesVersion}, + redisClient: rc, ctx: ctx, ctxCancel: ctxCancel, @@ -136,63 +138,36 @@ func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Ent return outRuleIds[:len(outRuleIds):len(outRuleIds)], nil } -// fetchHostServiceName for a host ID and a potential service ID from the Icinga DB relational database. -func (s *Source) fetchHostServiceName(ctx context.Context, hostId, serviceId, envId types.Binary) (host, service string, err error) { - err = retry.WithBackoff( - ctx, - func(ctx context.Context) error { - queryHost := s.db.Rebind("SELECT name FROM host WHERE id = ? AND environment_id = ?") - err := s.db.QueryRowxContext(ctx, queryHost, hostId, envId).Scan(&host) - if err != nil { - return errors.Wrap(err, "cannot select host") - } - - if serviceId != nil { - queryService := s.db.Rebind("SELECT name FROM service WHERE id = ? AND environment_id = ?") - err := s.db.QueryRowxContext(ctx, queryService, serviceId, envId).Scan(&service) - if err != nil { - return errors.Wrap(err, "cannot select service") - } - } - - return nil - }, - retry.Retryable, - backoff.DefaultBackoff, - retry.Settings{Timeout: retry.DefaultTimeout}) - return -} - // buildCommonEvent creates an event.Event based on Host and (optional) Service names. // // This function is used by all event builders to create a common event structure that includes the host and service // names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event. // Any event type-specific information (like severity, message, etc.) is added by the specific event builders. -func (s *Source) buildCommonEvent(host, service string) (*event.Event, error) { +func (s *Source) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) { var ( eventName string eventUrl *url.URL eventTags map[string]string ) - if service != "" { - eventName = host + "!" + service + if rlr.ServiceName != "" { + eventName = rlr.HostName + "!" + rlr.ServiceName eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service") - eventUrl.RawQuery = "name=" + utils.RawUrlEncode(service) + "&host.name=" + utils.RawUrlEncode(host) + eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName) eventTags = map[string]string{ - "host": host, - "service": service, + "host": rlr.HostName, + "service": rlr.ServiceName, } } else { - eventName = host + eventName = rlr.HostName eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host") - eventUrl.RawQuery = "name=" + utils.RawUrlEncode(host) + eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName) eventTags = map[string]string{ - "host": host, + "host": rlr.HostName, } } @@ -208,19 +183,19 @@ func (s *Source) buildCommonEvent(host, service string) (*event.Event, error) { // The resulted event will have all the necessary information for a state change event, and must // not be further modified by the caller. func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { - hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId) + res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, errors.Wrap(err, "cannot fetch host/service information") + return nil, err } - ev, err := s.buildCommonEvent(hostName, serviceName) + ev, err := s.buildCommonEvent(res) if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } ev.Type = event.TypeState - if serviceName != "" { + if res.ServiceName != "" { switch h.HardState { case 0: ev.Severity = event.SeverityOK @@ -256,14 +231,14 @@ func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH // buildDowntimeHistoryEvent from a downtime history entry. func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.DowntimeHistory) (*event.Event, error) { - hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId) + res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, errors.Wrap(err, "cannot fetch host/service information") + return nil, err } - ev, err := s.buildCommonEvent(hostName, serviceName) + ev, err := s.buildCommonEvent(res) if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { @@ -289,14 +264,14 @@ func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.Dow // buildFlappingHistoryEvent from a flapping history entry. func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { - hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId) + res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, errors.Wrap(err, "cannot fetch host/service information") + return nil, err } - ev, err := s.buildCommonEvent(hostName, serviceName) + ev, err := s.buildCommonEvent(res) if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } if h.PercentStateChangeEnd.Valid { @@ -320,14 +295,14 @@ func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla // buildAcknowledgementHistoryEvent from an acknowledgment history entry. func (s *Source) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { - hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId) + res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId) if err != nil { - return nil, errors.Wrap(err, "cannot fetch host/service information") + return nil, err } - ev, err := s.buildCommonEvent(hostName, serviceName) + ev, err := s.buildCommonEvent(res) if err != nil { - return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName) + return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName) } if !h.ClearTime.Time().IsZero() { diff --git a/pkg/notifications/redis_fetch.go b/pkg/notifications/redis_fetch.go new file mode 100644 index 00000000..8d61d776 --- /dev/null +++ b/pkg/notifications/redis_fetch.go @@ -0,0 +1,89 @@ +package notifications + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/retry" + "github.com/icinga/icinga-go-library/types" + "github.com/redis/go-redis/v9" +) + +// fetchHostServiceName retrieves the host and service names from Redis. +// +// It uses either the hostId or/and serviceId to fetch the corresponding names. If both are provided, +// the returned result will contain the host name and the service name accordingly. Otherwise, it will +// only contain the host name. +// +// Internally, it uses the Redis HGet command to fetch the data from the "icinga:host" and "icinga:service" hashes. +// If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the +// request and return an error indicating that the operation timed out. In case of the serviceId being set, the +// maximum execution time of the Redis HGet commands is 10s (5s for each HGet call). +func (s *Source) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) { + redisHGet := func(typ, field string, out *redisLookupResult) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err := retry.WithBackoff( + ctx, + func(ctx context.Context) error { return s.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) }, + retry.Retryable, + backoff.DefaultBackoff, + retry.Settings{}, + ) + if err != nil { + if errors.Is(err, redis.Nil) { + return fmt.Errorf("%s with ID %s not found in Redis", typ, hostId) + } + return fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, field, err) + } + return nil + } + + var result redisLookupResult + if err := redisHGet("host", hostId.String(), &result); err != nil { + return nil, err + } + + result.HostName = result.Name + result.Name = "" // Clear the name field for the host, as we will fetch the service name next. + + if serviceId != nil { + if err := redisHGet("service", serviceId.String(), &result); err != nil { + return nil, err + } + result.ServiceName = result.Name + result.Name = "" // It's not needed anymore, clear it! + } + + return &result, nil +} + +// redisLookupResult defines the structure of the Redis message we're interested in. +type redisLookupResult struct { + HostName string `json:"-"` // Name of the host (never empty). + ServiceName string `json:"-"` // Name of the service (only set in service context). + + // Name is used to retrieve the host or service name from Redis. + // It should not be used for any other purpose apart from within the [Source.fetchHostServiceName] function. + Name string `json:"name"` +} + +// UnmarshalBinary implements the [encoding.BinaryUnmarshaler] interface for redisLookupResult. +// +// It unmarshals the binary data of the Redis HGet result into the redisLookupResult struct. +// This is required for the HGet().Scan() usage in the [Source.fetchHostServiceName] function to work correctly. +func (rlr *redisLookupResult) UnmarshalBinary(data []byte) error { + if len(data) == 0 { + return errors.New("empty data received for redisLookupResult") + } + + if err := json.Unmarshal(data, rlr); err != nil { + return fmt.Errorf("failed to unmarshal redis result: %w", err) + } + return nil +}