mirror of
https://github.com/Icinga/icingadb.git
synced 2026-02-18 18:18:00 -05:00
notifications: Import StreamSorter Logic
The whole StreamSorter logic is only required for Icinga Notifications. Thus, the implementation was moved from the history package to the notifications package, removing some unnecessary generalizations on the way. This results in big changes to be made in the notifications package, while other modules are mostly not affected.
This commit is contained in:
parent
c6368b1f82
commit
e012ef6d1b
5 changed files with 94 additions and 131 deletions
|
|
@ -170,8 +170,7 @@ func run() int {
|
|||
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
|
||||
|
||||
{
|
||||
var callbackCfg *history.SyncCallbackConf
|
||||
|
||||
var extraStages map[string]history.StageFunc
|
||||
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
|
||||
logger.Info("Starting Icinga Notifications source")
|
||||
|
||||
|
|
@ -185,17 +184,13 @@ func run() int {
|
|||
logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err))
|
||||
}
|
||||
|
||||
callbackCfg = &history.SyncCallbackConf{
|
||||
StatPtr: &telemetry.Stats.NotificationSync,
|
||||
KeyStructPtr: notifications.SyncKeyStructPtrs,
|
||||
Fn: notificationsSource.Submit,
|
||||
}
|
||||
extraStages = notificationsSource.SyncExtraStages()
|
||||
}
|
||||
|
||||
go func() {
|
||||
logger.Info("Starting history sync")
|
||||
|
||||
if err := hs.Sync(ctx, callbackCfg); err != nil && !utils.IsContextCanceled(err) {
|
||||
if err := hs.Sync(ctx, extraStages); err != nil && !utils.IsContextCanceled(err) {
|
||||
logger.Fatalf("%+v", err)
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package history
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/icinga/icinga-go-library/com"
|
||||
"github.com/icinga/icinga-go-library/database"
|
||||
"github.com/icinga/icinga-go-library/logging"
|
||||
|
|
@ -16,7 +15,6 @@ import (
|
|||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"reflect"
|
||||
"slices"
|
||||
|
|
@ -30,17 +28,6 @@ type Sync struct {
|
|||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// SyncCallbackConf configures a callback stage given to Sync.Sync.
|
||||
type SyncCallbackConf struct {
|
||||
// StatPtr refers a [com.Counter] from the [telemetry.Stats] struct, e.g., Stats.NotificationSync.
|
||||
StatPtr *com.Counter
|
||||
// KeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If
|
||||
// a key is missing from the map, it will not be used for the callback.
|
||||
KeyStructPtr map[string]any
|
||||
// Fn is the actual callback function.
|
||||
Fn func(database.Entity) bool
|
||||
}
|
||||
|
||||
// NewSync creates a new Sync.
|
||||
func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync {
|
||||
return &Sync{
|
||||
|
|
@ -52,19 +39,9 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
|
|||
|
||||
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
|
||||
//
|
||||
// It is possible to enable a callback functionality, e.g., for the Icinga Notifications integration. To do so, the
|
||||
// callbackCfg must be set according to the SyncCallbackConf struct documentation.
|
||||
func (s Sync) Sync(ctx context.Context, callbackCfg *SyncCallbackConf) error {
|
||||
var callbackStageFn stageFunc
|
||||
if callbackCfg != nil {
|
||||
callbackStageFn = makeSortedCallbackStageFunc(
|
||||
ctx,
|
||||
s.logger,
|
||||
callbackCfg.StatPtr,
|
||||
callbackCfg.KeyStructPtr,
|
||||
callbackCfg.Fn)
|
||||
}
|
||||
|
||||
// The optional extraStages parameter allows specifying an additional extra stage for each pipeline, identified by their
|
||||
// key. This stage is executed after every other stage, but before the entry gets deleted from Redis.
|
||||
func (s Sync) Sync(ctx context.Context, extraStages map[string]StageFunc) error {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for key, pipeline := range syncPipelines {
|
||||
|
|
@ -90,19 +67,16 @@ func (s Sync) Sync(ctx context.Context, callbackCfg *SyncCallbackConf) error {
|
|||
// forward the entry after it has completed its own sync so that later stages can rely on previous stages being
|
||||
// executed successfully.
|
||||
//
|
||||
// If a callback exists for this key, it will be appended to the pipeline. Thus, it is executed after every
|
||||
// If an extra stage exists for this key, it will be appended to the pipeline. Thus, it is executed after every
|
||||
// other pipeline action, but before deleteFromRedis.
|
||||
|
||||
var hasCallbackStage bool
|
||||
if callbackCfg != nil {
|
||||
_, exists := callbackCfg.KeyStructPtr[key]
|
||||
hasCallbackStage = exists
|
||||
}
|
||||
|
||||
// Shadowed variable to allow appending custom callbacks.
|
||||
pipeline := pipeline
|
||||
if hasCallbackStage {
|
||||
pipeline = append(slices.Clip(pipeline), callbackStageFn)
|
||||
if extraStages != nil {
|
||||
extraStage, ok := extraStages[key]
|
||||
if ok {
|
||||
pipeline = append(slices.Clip(pipeline), extraStage)
|
||||
}
|
||||
}
|
||||
|
||||
ch := make([]chan redis.XMessage, len(pipeline)+1)
|
||||
|
|
@ -200,19 +174,19 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
|
|||
}
|
||||
}
|
||||
|
||||
// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
|
||||
// StageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
|
||||
// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information
|
||||
// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history
|
||||
// events from and an out channel to forward history entries to after processing them successfully. A stage function
|
||||
// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On
|
||||
// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis
|
||||
// and can be processed at a later time.
|
||||
type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error
|
||||
type StageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error
|
||||
|
||||
// writeOneEntityStage creates a stageFunc from a pointer to a struct implementing the v1.UpserterEntity interface.
|
||||
// writeOneEntityStage creates a StageFunc from a pointer to a struct implementing the v1.UpserterEntity interface.
|
||||
// For each history event it receives, it parses that event into a new instance of that entity type and writes it to
|
||||
// the database. It writes exactly one entity to the database for each history event.
|
||||
func writeOneEntityStage(structPtr any) stageFunc {
|
||||
func writeOneEntityStage(structPtr any) StageFunc {
|
||||
structifier := structify.MakeMapStructifier(
|
||||
reflect.TypeOf(structPtr).Elem(),
|
||||
"json",
|
||||
|
|
@ -231,9 +205,9 @@ func writeOneEntityStage(structPtr any) stageFunc {
|
|||
})
|
||||
}
|
||||
|
||||
// writeMultiEntityStage creates a stageFunc from a function that takes a history event as an input and returns a
|
||||
// writeMultiEntityStage creates a StageFunc from a function that takes a history event as an input and returns a
|
||||
// (potentially empty) slice of v1.UpserterEntity instances that it then inserts into the database.
|
||||
func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) stageFunc {
|
||||
func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) StageFunc {
|
||||
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
|
||||
type State struct {
|
||||
Message redis.XMessage // Original event from Redis.
|
||||
|
|
@ -345,7 +319,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse
|
|||
}
|
||||
}
|
||||
|
||||
// userNotificationStage is a specialized stageFunc that populates the user_notification_history table. It is executed
|
||||
// userNotificationStage is a specialized StageFunc that populates the user_notification_history table. It is executed
|
||||
// on the notification history stream and uses the users_notified_ids attribute to create an entry in the
|
||||
// user_notification_history relation table for each user ID.
|
||||
func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
|
||||
|
|
@ -404,9 +378,8 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re
|
|||
|
||||
// countElementStage increments the [Stats.History] counter.
|
||||
//
|
||||
// This stageFunc should be called last in a [syncPipeline]. Thus, it is still executed before the final
|
||||
// Sync.deleteFromRedis call in Sync.Sync. Furthermore, an optional callback function will be appended after this stage,
|
||||
// resulting in an incremented history state counter for synchronized history, but stalling callback actions.
|
||||
// This StageFunc should be called last in each syncPipeline. Thus, it is executed before the final
|
||||
// Sync.deleteFromRedis call in Sync.Sync, but before optional extra stages, potentially blocking.
|
||||
func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
|
||||
defer close(out)
|
||||
|
||||
|
|
@ -426,72 +399,6 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM
|
|||
}
|
||||
}
|
||||
|
||||
// makeSortedCallbackStageFunc creates a new stageFunc calling the callback function after reordering messages.
|
||||
//
|
||||
// This stageFunc is designed to be used by multiple channels. The internal sorting logic - realized by a StreamSorter -
|
||||
// results in all messages to be sorted based on their Redis Stream ID and be ejected to the callback function in this
|
||||
// order.
|
||||
//
|
||||
// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key.
|
||||
//
|
||||
// The callback call is blocking and the message will be forwarded to the out channel after the function has returned.
|
||||
// Thus, please ensure this function does not block too long.
|
||||
//
|
||||
// If the callback function returns false, the message will be retried after an increasing backoff. All subsequent
|
||||
// messages will wait until this one succeeds.
|
||||
//
|
||||
// For each successfully submitted message, the telemetry stat referenced via a pointer s incremented. Thus, a delta
|
||||
// between telemetry.Stats.History and this stat indicates blocking callbacks.
|
||||
func makeSortedCallbackStageFunc(
|
||||
ctx context.Context,
|
||||
logger *logging.Logger,
|
||||
statPtr *com.Counter,
|
||||
keyStructPtrs map[string]any,
|
||||
fn func(database.Entity) bool,
|
||||
) stageFunc {
|
||||
sorterCallbackFn := func(msg redis.XMessage, key string) bool {
|
||||
makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) {
|
||||
structPtr, ok := keyStructPtrs[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("key is not part of keyStructPtrs")
|
||||
}
|
||||
|
||||
structifier := structify.MakeMapStructifier(
|
||||
reflect.TypeOf(structPtr).Elem(),
|
||||
"json",
|
||||
contracts.SafeInit)
|
||||
val, err := structifier(values)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "can't structify values %#v for %q", values, key)
|
||||
}
|
||||
|
||||
entity, ok := val.(database.Entity)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("structifier returned %T which does not implement database.Entity", val)
|
||||
}
|
||||
|
||||
return entity, nil
|
||||
}
|
||||
|
||||
entity, err := makeEntity(key, msg.Values)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to create database.Entity out of Redis stream message",
|
||||
zap.Error(err),
|
||||
zap.String("key", key),
|
||||
zap.String("id", msg.ID))
|
||||
return false
|
||||
}
|
||||
|
||||
success := fn(entity)
|
||||
if success {
|
||||
statPtr.Add(1)
|
||||
}
|
||||
return success
|
||||
}
|
||||
|
||||
return NewStreamSorter(ctx, logger, sorterCallbackFn).PipelineFunc
|
||||
}
|
||||
|
||||
const (
|
||||
SyncPipelineAcknowledgement = "acknowledgement"
|
||||
SyncPipelineComment = "comment"
|
||||
|
|
@ -501,7 +408,7 @@ const (
|
|||
SyncPipelineState = "state"
|
||||
)
|
||||
|
||||
var syncPipelines = map[string][]stageFunc{
|
||||
var syncPipelines = map[string][]StageFunc{
|
||||
SyncPipelineAcknowledgement: {
|
||||
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
|
||||
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
|
||||
|
|
|
|||
|
|
@ -9,15 +9,19 @@ import (
|
|||
"github.com/icinga/icinga-go-library/notifications/event"
|
||||
"github.com/icinga/icinga-go-library/notifications/source"
|
||||
"github.com/icinga/icinga-go-library/redis"
|
||||
"github.com/icinga/icinga-go-library/structify"
|
||||
"github.com/icinga/icinga-go-library/types"
|
||||
"github.com/icinga/icinga-go-library/utils"
|
||||
"github.com/icinga/icingadb/internal"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/history"
|
||||
v1history "github.com/icinga/icingadb/pkg/icingadb/v1/history"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"reflect"
|
||||
"sync"
|
||||
)
|
||||
|
||||
|
|
@ -463,9 +467,64 @@ func (client *Client) Submit(entity database.Entity) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
var SyncKeyStructPtrs = map[string]any{
|
||||
history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil),
|
||||
history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil),
|
||||
history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil),
|
||||
history.SyncPipelineState: (*v1history.StateHistory)(nil),
|
||||
// SyncExtraStages returns a map of history sync keys to [history.StageFunc] to be used for [history.Sync].
|
||||
//
|
||||
// Passing the return value of this method as the extraStages parameter to [history.Sync] results in forwarding events
|
||||
// from the Icinga DB history stream to Icinga Notifications after being resorted via the StreamSorter.
|
||||
func (client *Client) SyncExtraStages() map[string]history.StageFunc {
|
||||
var syncKeyStructPtrs = map[string]any{
|
||||
history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil),
|
||||
history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil),
|
||||
history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil),
|
||||
history.SyncPipelineState: (*v1history.StateHistory)(nil),
|
||||
}
|
||||
|
||||
sorterCallbackFn := func(msg redis.XMessage, key string) bool {
|
||||
makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) {
|
||||
structPtr, ok := syncKeyStructPtrs[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("key is not part of keyStructPtrs")
|
||||
}
|
||||
|
||||
structifier := structify.MakeMapStructifier(
|
||||
reflect.TypeOf(structPtr).Elem(),
|
||||
"json",
|
||||
contracts.SafeInit)
|
||||
val, err := structifier(values)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "can't structify values %#v for %q", values, key)
|
||||
}
|
||||
|
||||
entity, ok := val.(database.Entity)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("structifier returned %T which does not implement database.Entity", val)
|
||||
}
|
||||
|
||||
return entity, nil
|
||||
}
|
||||
|
||||
entity, err := makeEntity(key, msg.Values)
|
||||
if err != nil {
|
||||
client.logger.Errorw("Failed to create database.Entity out of Redis stream message",
|
||||
zap.Error(err),
|
||||
zap.String("key", key),
|
||||
zap.String("id", msg.ID))
|
||||
return false
|
||||
}
|
||||
|
||||
success := client.Submit(entity)
|
||||
if success {
|
||||
telemetry.Stats.NotificationSync.Add(1)
|
||||
}
|
||||
return success
|
||||
}
|
||||
|
||||
pipelineFn := NewStreamSorter(client.ctx, client.logger, sorterCallbackFn).PipelineFunc
|
||||
|
||||
extraStages := make(map[string]history.StageFunc)
|
||||
for k := range syncKeyStructPtrs {
|
||||
extraStages[k] = pipelineFn
|
||||
}
|
||||
|
||||
return extraStages
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package history
|
||||
package notifications
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/icinga/icinga-go-library/logging"
|
||||
"github.com/icinga/icinga-go-library/redis"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/history"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
|
@ -347,7 +348,7 @@ func (sorter *StreamSorter) submit(msg redis.XMessage, key string, out chan<- re
|
|||
}
|
||||
}
|
||||
|
||||
// PipelineFunc implements the interface expected for a history sync pipeline stage.
|
||||
// PipelineFunc implements the [history.StageFunc] type expected for a history sync pipeline stage.
|
||||
//
|
||||
// This method of a single StreamSorter can be inserted into multiple history sync pipelines and will forward all
|
||||
// messages from in to out as expected from a pipeline stage. In between, all messages are processed by the
|
||||
|
|
@ -355,7 +356,7 @@ func (sorter *StreamSorter) submit(msg redis.XMessage, key string, out chan<- re
|
|||
// according to its specification (see the comment on the StreamSorter type).
|
||||
func (sorter *StreamSorter) PipelineFunc(
|
||||
ctx context.Context,
|
||||
s Sync,
|
||||
_ history.Sync,
|
||||
key string,
|
||||
in <-chan redis.XMessage,
|
||||
out chan<- redis.XMessage,
|
||||
|
|
@ -395,7 +396,7 @@ func (sorter *StreamSorter) PipelineFunc(
|
|||
|
||||
err := sorter.submit(msg, key, out)
|
||||
if err != nil {
|
||||
s.logger.Errorw("Failed to submit Redis stream event to stream sorter",
|
||||
sorter.logger.Errorw("Failed to submit Redis stream event to stream sorter",
|
||||
zap.String("key", key),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
// #nosec G404 -- Allow math/rand for the tests
|
||||
package history
|
||||
package notifications
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/icinga/icinga-go-library/logging"
|
||||
"github.com/icinga/icinga-go-library/redis"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/history"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"math/rand"
|
||||
|
|
@ -263,7 +264,7 @@ func TestStreamSorter(t *testing.T) {
|
|||
in := make(chan redis.XMessage)
|
||||
out := make(chan redis.XMessage)
|
||||
go func() {
|
||||
require.NoError(t, sorter.PipelineFunc(context.Background(), Sync{}, "", in, out))
|
||||
require.NoError(t, sorter.PipelineFunc(context.Background(), history.Sync{}, "", in, out))
|
||||
}()
|
||||
|
||||
if !earlyClose {
|
||||
Loading…
Reference in a new issue