notifications: Fetch customvars from Redis

After reintroducing Event.ExtraTags in the IGL and Icinga Notifications,
Icinga DB populates events by their custom variables.

At the moment, the required customvars are fetched from Redis for each
event. Due to the Redis schema, at least on HGETALL with manual
filtering is required. This might be a good candidate for further
caching, and cache invalidation.
This commit is contained in:
Alvar Penning 2025-10-22 18:07:23 +02:00
parent 1ec561415d
commit 5abb8b4212
No known key found for this signature in database
4 changed files with 140 additions and 41 deletions

2
go.mod
View file

@ -13,7 +13,6 @@ require (
github.com/mattn/go-sqlite3 v1.14.32
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.16.0
github.com/stretchr/testify v1.11.1
github.com/vbauerster/mpb/v6 v6.0.4
go.uber.org/zap v1.27.0
@ -35,6 +34,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.12 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.16.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/ssgreg/journald v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect

View file

@ -543,7 +543,7 @@ func makeCallbackStageFunc(
if len(msgs) == 1 {
backlogLastId = ""
logger.Infow("Finished rolling back backlog of callback elements", zap.Int("delay", backlogMsgCounter))
logger.Infow("Finished rolling back backlog of callback elements", zap.Int("elements", backlogMsgCounter))
} else {
backlogLastId = msgs[1].ID
backlogTimerInterval = backlogTimerMinInterval

View file

@ -39,7 +39,7 @@ type Client struct {
notificationsClient *source.Client // The Icinga Notifications client used to interact with the API.
redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events.
submissionMutex sync.Mutex
submissionMutex sync.Mutex // submissionMutex protects not concurrent safe struct fields in Client.Submit, i.e., rulesInfo.
}
// NewNotificationsClient creates a new Client connected to an existing database and logger.
@ -132,7 +132,7 @@ func (client *Client) buildCommonEvent(
ctx context.Context,
hostId, serviceId types.Binary,
) (*event.Event, *redisLookupResult, error) {
rlr, err := client.fetchHostServiceName(ctx, hostId, serviceId)
rlr, err := client.fetchHostServiceFromRedis(ctx, hostId, serviceId)
if err != nil {
return nil, nil, err
}
@ -143,25 +143,26 @@ func (client *Client) buildCommonEvent(
objectTags map[string]string
)
if rlr.ServiceName != "" {
objectName = rlr.HostName + "!" + rlr.ServiceName
objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName)
if rlr.serviceName != "" {
objectName = rlr.hostName + "!" + rlr.serviceName
objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(rlr.serviceName) + "&host.name=" + utils.RawUrlEncode(rlr.hostName)
objectTags = map[string]string{
"host": rlr.HostName,
"service": rlr.ServiceName,
"host": rlr.hostName,
"service": rlr.serviceName,
}
} else {
objectName = rlr.HostName
objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(rlr.HostName)
objectName = rlr.hostName
objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(rlr.hostName)
objectTags = map[string]string{
"host": rlr.HostName,
"host": rlr.hostName,
}
}
return &event.Event{
Name: objectName,
URL: objectUrl,
Tags: objectTags,
Name: objectName,
URL: objectUrl,
Tags: objectTags,
ExtraTags: rlr.CustomVars(),
}, rlr, nil
}
@ -177,7 +178,7 @@ func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.S
ev.Type = event.TypeState
if rlr.ServiceName != "" {
if rlr.serviceName != "" {
switch h.HardState {
case 0:
ev.Severity = event.SeverityOK

View file

@ -3,36 +3,56 @@ package notifications
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/retry"
"github.com/icinga/icinga-go-library/types"
"github.com/redis/go-redis/v9"
)
// fetchHostServiceName retrieves the host and service names from Redis.
// redisCustomVar is a customvar entry from Redis.
type redisCustomVar struct {
EnvironmentID types.Binary `json:"environment_id"`
Name string `json:"name"`
Value string `json:"value"`
}
// redisLookupResult defines the structure of the Redis message we're interested in.
type redisLookupResult struct {
hostName string
serviceName string
customVars []*redisCustomVar
}
// CustomVars returns a mapping of customvar names to values.
func (result redisLookupResult) CustomVars() map[string]string {
m := make(map[string]string)
for _, customvar := range result.customVars {
m[customvar.Name] = customvar.Value
}
return m
}
// fetchHostServiceFromRedis retrieves the host and service names and customvars from Redis.
//
// It uses either the hostId or/and serviceId to fetch the corresponding names. If both are provided,
// the returned result will contain the host name and the service name accordingly. Otherwise, it will
// only contain the host name.
//
// Internally, it uses the Redis HGet command to fetch the data from the "icinga:host" and "icinga:service" hashes.
// If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the
// request and return an error indicating that the operation timed out. In case of the serviceId being set, the
// maximum execution time of the Redis HGet commands is 10s (5s for each HGet call).
func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
getNameFromRedis := func(typ, id string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// The function has a hard coded timeout of five seconds for all HGET and HGETALL commands together.
func (client *Client) fetchHostServiceFromRedis(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
hgetFromRedis := func(key, id string) (string, error) {
var data string
err := retry.WithBackoff(
ctx,
func(ctx context.Context) (err error) {
data, err = client.redisClient.HGet(ctx, "icinga:"+typ, id).Result()
data, err = client.redisClient.HGet(ctx, key, id).Result()
return
},
retry.Retryable,
@ -40,16 +60,21 @@ func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceI
retry.Settings{},
)
if err != nil {
if errors.Is(err, redis.Nil) {
return "", fmt.Errorf("%s with ID %s not found in Redis", typ, hostId)
}
return "", fmt.Errorf("failed to fetch %s with ID %s from Redis: %w", typ, id, err)
return "", fmt.Errorf("redis hget %q, %q failed: %w", key, id, err)
}
return data, nil
}
getNameFromRedis := func(typ, id string) (string, error) {
data, err := hgetFromRedis("icinga:"+typ, id)
if err != nil {
return "", err
}
var result struct {
Name string `json:"name"`
}
if err := json.Unmarshal([]byte(data), &result); err != nil {
return "", fmt.Errorf("failed to unmarshal redis result: %w", err)
}
@ -57,26 +82,99 @@ func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceI
return result.Name, nil
}
getCustomVarFromRedis := func(id string) (*redisCustomVar, error) {
data, err := hgetFromRedis("icinga:customvar", id)
if err != nil {
return nil, err
}
customvar := new(redisCustomVar)
if err := json.Unmarshal([]byte(data), customvar); err != nil {
return nil, fmt.Errorf("failed to unmarshal redis result: %w", err)
}
return customvar, nil
}
getObjectCustomVarsFromRedis := func(typ, id string) ([]*redisCustomVar, error) {
var resMap map[string]string
err := retry.WithBackoff(
ctx,
func(ctx context.Context) (err error) {
res := client.redisClient.HGetAll(ctx, "icinga:"+typ+":customvar")
if err = res.Err(); err != nil {
return
}
resMap, err = res.Result()
return
},
retry.Retryable,
backoff.DefaultBackoff,
retry.Settings{},
)
if err != nil {
return nil, fmt.Errorf("failed to HGETALL icinga:%s:customvar from Redis: %w", typ, err)
}
var result struct {
CustomvarId string `json:"customvar_id"`
HostId string `json:"host_id"`
ServiceId string `json:"service_id"`
}
var customvars []*redisCustomVar
for _, res := range resMap {
if err := json.Unmarshal([]byte(res), &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal redis result: %w", err)
}
switch typ {
case "host":
if result.HostId != id {
continue
}
case "service":
if result.ServiceId != id {
continue
}
default:
panic(fmt.Sprintf("unexpected object type %q", typ))
}
customvar, err := getCustomVarFromRedis(result.CustomvarId)
if err != nil {
return nil, fmt.Errorf("failed to fetch customvar: %w", err)
}
customvars = append(customvars, customvar)
}
return customvars, nil
}
var result redisLookupResult
var err error
result.HostName, err = getNameFromRedis("host", hostId.String())
result.hostName, err = getNameFromRedis("host", hostId.String())
if err != nil {
return nil, err
}
if serviceId != nil {
result.ServiceName, err = getNameFromRedis("service", serviceId.String())
result.serviceName, err = getNameFromRedis("service", serviceId.String())
if err != nil {
return nil, err
}
}
if serviceId == nil {
result.customVars, err = getObjectCustomVarsFromRedis("host", hostId.String())
} else {
result.customVars, err = getObjectCustomVarsFromRedis("service", serviceId.String())
}
if err != nil {
return nil, err
}
return &result, nil
}
// redisLookupResult defines the structure of the Redis message we're interested in.
type redisLookupResult struct {
HostName string `json:"-"` // Name of the host (never empty).
ServiceName string `json:"-"` // Name of the service (only set in service context).
}