mirror of
https://github.com/Icinga/icingadb.git
synced 2026-02-18 18:18:00 -05:00
Retrieve host and service names from Redis
Instead of retrieving the host and service names from the used RDBMs, this commit allows us to query them from Redis. This is done to avoid the overhead of database queries, especially when the host and service names are always to be found in Redis. The previous implementation simply perfomed two database queries with each received entity based on their IDs, but we can perform this operation more efficiently from Redis using the same filtering logic as before. Of course, we now have to maintain more code needed to handle the Redis operations, but this is a trade-off we should be willing to make for performance reasons.
This commit is contained in:
parent
ca3f7c5c9d
commit
7e5b8e5385
4 changed files with 122 additions and 57 deletions
|
|
@ -177,6 +177,7 @@ func run() int {
|
|||
notificationsSource := notifications.NewNotificationsSource(
|
||||
ctx,
|
||||
db,
|
||||
rc,
|
||||
logs.GetChildLogger("notifications-source"),
|
||||
cfg)
|
||||
notificationsSourceCallback = notificationsSource.Submit
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -13,6 +13,7 @@ require (
|
|||
github.com/mattn/go-sqlite3 v1.14.32
|
||||
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/redis/go-redis/v9 v9.16.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/vbauerster/mpb/v6 v6.0.4
|
||||
go.uber.org/zap v1.27.0
|
||||
|
|
@ -34,7 +35,6 @@ require (
|
|||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.12 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/redis/go-redis/v9 v9.16.0 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/ssgreg/journald v1.0.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
|
|
|
|||
|
|
@ -8,12 +8,11 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/icinga/icinga-go-library/backoff"
|
||||
"github.com/icinga/icinga-go-library/database"
|
||||
"github.com/icinga/icinga-go-library/logging"
|
||||
"github.com/icinga/icinga-go-library/notifications"
|
||||
"github.com/icinga/icinga-go-library/notifications/event"
|
||||
"github.com/icinga/icinga-go-library/retry"
|
||||
"github.com/icinga/icinga-go-library/redis"
|
||||
"github.com/icinga/icinga-go-library/types"
|
||||
"github.com/icinga/icinga-go-library/utils"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
|
|
@ -41,6 +40,7 @@ type Source struct {
|
|||
ctxCancel context.CancelFunc
|
||||
|
||||
notificationsClient *notifications.Client // The Icinga Notifications client used to interact with the API.
|
||||
redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events.
|
||||
}
|
||||
|
||||
// NewNotificationsSource creates a new Source connected to an existing database and logger.
|
||||
|
|
@ -49,6 +49,7 @@ type Source struct {
|
|||
func NewNotificationsSource(
|
||||
ctx context.Context,
|
||||
db *database.DB,
|
||||
rc *redis.Client,
|
||||
logger *logging.Logger,
|
||||
cfg notifications.Config,
|
||||
) *Source {
|
||||
|
|
@ -61,7 +62,8 @@ func NewNotificationsSource(
|
|||
db: db,
|
||||
logger: logger,
|
||||
|
||||
rules: ¬ifications.SourceRulesInfo{Version: notifications.EmptyRulesVersion},
|
||||
rules: ¬ifications.SourceRulesInfo{Version: notifications.EmptyRulesVersion},
|
||||
redisClient: rc,
|
||||
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
|
|
@ -136,63 +138,36 @@ func (s *Source) evaluateRulesForObject(ctx context.Context, entity database.Ent
|
|||
return outRuleIds[:len(outRuleIds):len(outRuleIds)], nil
|
||||
}
|
||||
|
||||
// fetchHostServiceName for a host ID and a potential service ID from the Icinga DB relational database.
|
||||
func (s *Source) fetchHostServiceName(ctx context.Context, hostId, serviceId, envId types.Binary) (host, service string, err error) {
|
||||
err = retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) error {
|
||||
queryHost := s.db.Rebind("SELECT name FROM host WHERE id = ? AND environment_id = ?")
|
||||
err := s.db.QueryRowxContext(ctx, queryHost, hostId, envId).Scan(&host)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot select host")
|
||||
}
|
||||
|
||||
if serviceId != nil {
|
||||
queryService := s.db.Rebind("SELECT name FROM service WHERE id = ? AND environment_id = ?")
|
||||
err := s.db.QueryRowxContext(ctx, queryService, serviceId, envId).Scan(&service)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot select service")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
retry.Retryable,
|
||||
backoff.DefaultBackoff,
|
||||
retry.Settings{Timeout: retry.DefaultTimeout})
|
||||
return
|
||||
}
|
||||
|
||||
// buildCommonEvent creates an event.Event based on Host and (optional) Service names.
|
||||
//
|
||||
// This function is used by all event builders to create a common event structure that includes the host and service
|
||||
// names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event.
|
||||
// Any event type-specific information (like severity, message, etc.) is added by the specific event builders.
|
||||
func (s *Source) buildCommonEvent(host, service string) (*event.Event, error) {
|
||||
func (s *Source) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) {
|
||||
var (
|
||||
eventName string
|
||||
eventUrl *url.URL
|
||||
eventTags map[string]string
|
||||
)
|
||||
|
||||
if service != "" {
|
||||
eventName = host + "!" + service
|
||||
if rlr.ServiceName != "" {
|
||||
eventName = rlr.HostName + "!" + rlr.ServiceName
|
||||
|
||||
eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service")
|
||||
eventUrl.RawQuery = "name=" + utils.RawUrlEncode(service) + "&host.name=" + utils.RawUrlEncode(host)
|
||||
eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName)
|
||||
|
||||
eventTags = map[string]string{
|
||||
"host": host,
|
||||
"service": service,
|
||||
"host": rlr.HostName,
|
||||
"service": rlr.ServiceName,
|
||||
}
|
||||
} else {
|
||||
eventName = host
|
||||
eventName = rlr.HostName
|
||||
|
||||
eventUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host")
|
||||
eventUrl.RawQuery = "name=" + utils.RawUrlEncode(host)
|
||||
eventUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName)
|
||||
|
||||
eventTags = map[string]string{
|
||||
"host": host,
|
||||
"host": rlr.HostName,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -208,19 +183,19 @@ func (s *Source) buildCommonEvent(host, service string) (*event.Event, error) {
|
|||
// The resulted event will have all the necessary information for a state change event, and must
|
||||
// not be further modified by the caller.
|
||||
func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
|
||||
hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId)
|
||||
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot fetch host/service information")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev, err := s.buildCommonEvent(hostName, serviceName)
|
||||
ev, err := s.buildCommonEvent(res)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName)
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
|
||||
}
|
||||
|
||||
ev.Type = event.TypeState
|
||||
|
||||
if serviceName != "" {
|
||||
if res.ServiceName != "" {
|
||||
switch h.HardState {
|
||||
case 0:
|
||||
ev.Severity = event.SeverityOK
|
||||
|
|
@ -256,14 +231,14 @@ func (s *Source) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH
|
|||
|
||||
// buildDowntimeHistoryEvent from a downtime history entry.
|
||||
func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.DowntimeHistory) (*event.Event, error) {
|
||||
hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId)
|
||||
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot fetch host/service information")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev, err := s.buildCommonEvent(hostName, serviceName)
|
||||
ev, err := s.buildCommonEvent(res)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName)
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
|
||||
}
|
||||
|
||||
if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool {
|
||||
|
|
@ -289,14 +264,14 @@ func (s *Source) buildDowntimeHistoryEvent(ctx context.Context, h *v1history.Dow
|
|||
|
||||
// buildFlappingHistoryEvent from a flapping history entry.
|
||||
func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
|
||||
hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId)
|
||||
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot fetch host/service information")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev, err := s.buildCommonEvent(hostName, serviceName)
|
||||
ev, err := s.buildCommonEvent(res)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName)
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
|
||||
}
|
||||
|
||||
if h.PercentStateChangeEnd.Valid {
|
||||
|
|
@ -320,14 +295,14 @@ func (s *Source) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla
|
|||
|
||||
// buildAcknowledgementHistoryEvent from an acknowledgment history entry.
|
||||
func (s *Source) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
|
||||
hostName, serviceName, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId, h.EnvironmentId)
|
||||
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot fetch host/service information")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev, err := s.buildCommonEvent(hostName, serviceName)
|
||||
ev, err := s.buildCommonEvent(res)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", hostName, serviceName)
|
||||
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
|
||||
}
|
||||
|
||||
if !h.ClearTime.Time().IsZero() {
|
||||
|
|
|
|||
89
pkg/notifications/redis_fetch.go
Normal file
89
pkg/notifications/redis_fetch.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package notifications
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/icinga/icinga-go-library/backoff"
|
||||
"github.com/icinga/icinga-go-library/retry"
|
||||
"github.com/icinga/icinga-go-library/types"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// fetchHostServiceName retrieves the host and service names from Redis.
|
||||
//
|
||||
// It uses either the hostId or/and serviceId to fetch the corresponding names. If both are provided,
|
||||
// the returned result will contain the host name and the service name accordingly. Otherwise, it will
|
||||
// only contain the host name.
|
||||
//
|
||||
// Internally, it uses the Redis HGet command to fetch the data from the "icinga:host" and "icinga:service" hashes.
|
||||
// If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the
|
||||
// request and return an error indicating that the operation timed out. In case of the serviceId being set, the
|
||||
// maximum execution time of the Redis HGet commands is 10s (5s for each HGet call).
|
||||
func (s *Source) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
|
||||
redisHGet := func(typ, field string, out *redisLookupResult) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) error { return s.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) },
|
||||
retry.Retryable,
|
||||
backoff.DefaultBackoff,
|
||||
retry.Settings{},
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return fmt.Errorf("%s with ID %s not found in Redis", typ, hostId)
|
||||
}
|
||||
return fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, field, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var result redisLookupResult
|
||||
if err := redisHGet("host", hostId.String(), &result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.HostName = result.Name
|
||||
result.Name = "" // Clear the name field for the host, as we will fetch the service name next.
|
||||
|
||||
if serviceId != nil {
|
||||
if err := redisHGet("service", serviceId.String(), &result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.ServiceName = result.Name
|
||||
result.Name = "" // It's not needed anymore, clear it!
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// redisLookupResult defines the structure of the Redis message we're interested in.
|
||||
type redisLookupResult struct {
|
||||
HostName string `json:"-"` // Name of the host (never empty).
|
||||
ServiceName string `json:"-"` // Name of the service (only set in service context).
|
||||
|
||||
// Name is used to retrieve the host or service name from Redis.
|
||||
// It should not be used for any other purpose apart from within the [Source.fetchHostServiceName] function.
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements the [encoding.BinaryUnmarshaler] interface for redisLookupResult.
|
||||
//
|
||||
// It unmarshals the binary data of the Redis HGet result into the redisLookupResult struct.
|
||||
// This is required for the HGet().Scan() usage in the [Source.fetchHostServiceName] function to work correctly.
|
||||
func (rlr *redisLookupResult) UnmarshalBinary(data []byte) error {
|
||||
if len(data) == 0 {
|
||||
return errors.New("empty data received for redisLookupResult")
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, rlr); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal redis result: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in a new issue