diff --git a/pkg/icingadb/history/sorter.go b/pkg/icingadb/history/sorter.go index d079094f..ba81954d 100644 --- a/pkg/icingadb/history/sorter.go +++ b/pkg/icingadb/history/sorter.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "slices" "strconv" "strings" "time" @@ -44,9 +43,9 @@ func parseRedisStreamId(redisStreamId string) (int64, int64, error) { // are not precise enough. type streamSorterSubmission struct { // msg is the Redis message to be forwarded to out after this submission was sorted. - msg redis.XMessage - args any - out chan<- redis.XMessage + msg redis.XMessage + key string + out chan<- redis.XMessage // Required for sorting. streamIdMs int64 // streamIdMs is the Redis Stream ID timestamp part (milliseconds) @@ -55,10 +54,11 @@ type streamSorterSubmission struct { } // MarshalLogObject implements [zapcore.ObjectMarshaler]. -func (sub streamSorterSubmission) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (sub *streamSorterSubmission) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("redis-id-ms", sub.streamIdMs) encoder.AddInt64("redis-id-seq", sub.streamIdSeq) encoder.AddTime("submit-time", sub.submitTime) + encoder.AddString("out", fmt.Sprint(sub.out)) return nil } @@ -103,26 +103,37 @@ func (subs *streamSorterSubmissions) Pop() any { return x } -// StreamSorter accepts multiple [redis.XMessage] via Submit and ejects them in an ordered fashion. +// Peek returns the smallest element from the heap without removing it, or nil if the heap is empty. +func (subs streamSorterSubmissions) Peek() *streamSorterSubmission { + if len(subs) > 0 { + return subs[0] + } else { + return nil + } +} + +// StreamSorter is a helper that can used to intercept messages from different history sync pipelines and passes them +// to a callback in the order given by their Redis Stream ID (sorted across all involved streams). // -// Internally, two goroutines are used. The first one collects the submissions and sorts them into a heap based on the -// Redis Stream ID. After being in the heap for at least three seconds, a submission is forwarded to the other -// goroutine. There, each element is passed to the callback function in order. Only if the callback function has -// succeeded, it is removed from the top of the queue. +// After a message is received, it is kept in a priority queue for three seconds to wait for possible messages from +// another stream with a smaller ID. Thus, if a message is received delayed for more than three seconds, it will be +// relayed out of order. The StreamSorter is only able to ensure order to a certain degree of chaos. // -// Thus, if a message is received delayed for more than three seconds, it will be relayed out of order. The StreamSorter -// is only able to ensure order to a certain degree of chaos. -// -// The callback function receives the [redis.XMessage] together with generic args passed in Submit for additional -// context. If the callback function returns true, the element will be removed from the queue. Otherwise, the element -// will be kept at top of the queue and retried next time. +// The callback function receives the [redis.XMessage] together with the Redis stream name (key) for additional +// context. The callback function is supposed to return true on success. Otherwise, the callback will be retried until +// it succeeds. type StreamSorter struct { - ctx context.Context - logger *logging.Logger - callbackFn func(redis.XMessage, any) bool - submissionCh chan *streamSorterSubmission - closeChSubmission chan chan<- redis.XMessage - closeChQueue chan chan<- redis.XMessage + ctx context.Context + logger *logging.Logger + callbackFn func(redis.XMessage, string) bool + submissionCh chan *streamSorterSubmission + + // registerOutCh is used by PipelineFunc() to register output channels with worker() + registerOutCh chan chan<- redis.XMessage + + // closeOutCh is used by PipelineFunc() to signal to worker() that there will be no more submissions destined for + // that output channel and it can be closed by the worker after it processed all pending submissions for it. + closeOutCh chan chan<- redis.XMessage // verbose implies a verbose debug logging. Don't think one want to have this outside the tests. verbose bool @@ -132,218 +143,170 @@ type StreamSorter struct { func NewStreamSorter( ctx context.Context, logger *logging.Logger, - callbackFn func(msg redis.XMessage, args any) bool, + callbackFn func(msg redis.XMessage, key string) bool, ) *StreamSorter { sorter := &StreamSorter{ - ctx: ctx, - logger: logger, - callbackFn: callbackFn, - submissionCh: make(chan *streamSorterSubmission), - closeChSubmission: make(chan chan<- redis.XMessage), - closeChQueue: make(chan chan<- redis.XMessage), + ctx: ctx, + logger: logger, + callbackFn: callbackFn, + submissionCh: make(chan *streamSorterSubmission), + registerOutCh: make(chan chan<- redis.XMessage), + closeOutCh: make(chan chan<- redis.XMessage), } - _ = context.AfterFunc(ctx, func() { - close(sorter.submissionCh) - close(sorter.closeChSubmission) - close(sorter.closeChQueue) - }) - - ch := make(chan *streamSorterSubmission) - go sorter.submissionWorker(ch) - go sorter.queueWorker(ch) + go sorter.worker() return sorter } -// submissionWorker listens ton submissionCh populated by Submit, fills the heap and ejects streamSorterSubmissions into -// out, linked to the queueWorker goroutine for further processing. -func (sorter *StreamSorter) submissionWorker(out chan<- *streamSorterSubmission) { - defer close(out) +// startCallback initiates the callback in a background goroutine and returns a channel that is closed once the callback +// has succeeded. It retries the callback with a backoff until it signal success by returning true. +func (sorter *StreamSorter) startCallback(msg redis.XMessage, key string) <-chan struct{} { + callbackCh := make(chan struct{}) - // When a streamSorterSubmission is created in the Submit method, the current time.Time is added to the struct. + go func() { + defer close(callbackCh) + + const callbackMaxDelay = 10 * time.Second + callbackDelay := time.Duration(0) + + for { + select { + case <-sorter.ctx.Done(): + return + case <-time.After(callbackDelay): + } + + start := time.Now() + success := sorter.callbackFn(msg, key) + + if sorter.verbose { + sorter.logger.Debugw("Callback finished", + zap.String("id", msg.ID), + zap.Bool("success", success), + zap.Duration("duration", time.Since(start)), + zap.Duration("next-delay", callbackDelay)) + } + + if success { + return + } else { + callbackDelay = min(2*max(time.Millisecond, callbackDelay), callbackMaxDelay) + } + + } + }() + + return callbackCh +} + +// worker +func (sorter *StreamSorter) worker() { + // When a streamSorterSubmission is created in the submit method, the current time.Time is added to the struct. // Only if the submission was at least three seconds (submissionMinAge) ago, a popped submission from the heap will // be forwarded to the other goroutine for future processing. const submissionMinAge = 3 * time.Second - submissionHeap := &streamSorterSubmissions{} + var submissionHeap streamSorterSubmissions - ticker := time.NewTicker(time.Second) - defer ticker.Stop() + type OutputState struct { + pending int + close bool + } + + registeredOutputs := make(map[chan<- redis.XMessage]*OutputState) + + // Close all registered outputs when we exit. + defer func() { + for out := range registeredOutputs { + close(out) + } + }() + + var runningSubmission *streamSorterSubmission + var runningCallbackCh <-chan struct{} for { - select { - case <-sorter.ctx.Done(): - return + // Sanity check + if (runningSubmission == nil) != (runningCallbackCh == nil) { + panic(fmt.Sprintf("inconsistent state: runningSubmission=%#v and runningCallbackCh=%#v", runningSubmission, runningCallbackCh)) + } - case sub, ok := <-sorter.submissionCh: - if !ok { - return + var nextSubmissionDue <-chan time.Time + if runningCallbackCh == nil { + if next := submissionHeap.Peek(); next != nil { + if submissionAge := time.Since(next.submitTime); submissionAge >= submissionMinAge { + runningCallbackCh = sorter.startCallback(next.msg, next.key) + runningSubmission = next + heap.Pop(&submissionHeap) + } else { + nextSubmissionDue = time.After(submissionMinAge - submissionAge) + } + } + } + + select { + case out := <-sorter.registerOutCh: + if sorter.verbose { + sorter.logger.Debugw("worker: register output", zap.String("out", fmt.Sprint(out))) + } + if _, ok := registeredOutputs[out]; ok { + panic("attempting to register the same output channel twice") + } + registeredOutputs[out] = &OutputState{} + // This function is now responsible for closing out. + + case out := <-sorter.closeOutCh: + if sorter.verbose { + sorter.logger.Debugw("worker: request close output", zap.String("out", fmt.Sprint(out))) + } + if state := registeredOutputs[out]; state == nil { + panic("requested to close unknown output channel") + } else if state.pending > 0 { + // Still pending work, mark the output and wait for it to complete. + state.close = true + } else { + // Output can be closed and unregistered immediately + close(out) + delete(registeredOutputs, out) } + case sub := <-sorter.submissionCh: if sorter.verbose { sorter.logger.Debugw("Push submission to heap", zap.Object("submission", sub)) } - heap.Push(submissionHeap, sub) - - case ch, ok := <-sorter.closeChSubmission: - if !ok { - return + if state := registeredOutputs[sub.out]; state == nil { + panic("submission for an unknown output channel") + } else { + state.pending++ + heap.Push(&submissionHeap, sub) } - bkp := &streamSorterSubmissions{} - for submissionHeap.Len() > 0 { - x := heap.Pop(submissionHeap) - sub, ok := x.(*streamSorterSubmission) - if !ok { - panic(fmt.Sprintf("invalid type %T from submission heap", x)) - } + case <-nextSubmissionDue: + // Loop start processing of the next submission. + continue - if sub.out == ch { - continue - } - - bkp.Push(sub) - } - submissionHeap = bkp - - case <-ticker.C: - start := time.Now() - submissionCounter := 0 - - for submissionHeap.Len() > 0 { - if peek := (*submissionHeap)[0]; time.Since(peek.submitTime) < submissionMinAge { - if sorter.verbose { - sorter.logger.Debugw("Stopped popping heap as submission is not old enough", - zap.Object("submission", peek), - zap.Int("submissions", submissionCounter), - zap.Duration("duration", time.Since(start))) - } - break - } - - x := heap.Pop(submissionHeap) - sub, ok := x.(*streamSorterSubmission) - if !ok { - panic(fmt.Sprintf("invalid type %T from submission heap", x)) - } - - out <- sub - submissionCounter++ + case <-runningCallbackCh: + out := runningSubmission.out + out <- runningSubmission.msg + state := registeredOutputs[out] + state.pending-- + if state.close && state.pending == 0 { + close(out) + delete(registeredOutputs, out) } - if sorter.verbose && submissionCounter > 0 { - sorter.logger.Debugw("Ejected submissions to callback worker", - zap.Int("submissions", submissionCounter), - zap.Duration("duration", time.Since(start))) - } + runningCallbackCh = nil + runningSubmission = nil + + case <-sorter.ctx.Done(): + return } } } -// queueWorker receives sorted streamSorterSubmissions from submissionWorker and forwards them to the callback. -func (sorter *StreamSorter) queueWorker(in <-chan *streamSorterSubmission) { - // Each streamSorterSubmission received from "in" is stored in the queue slice. From there on, the slice head is - // passed to the callback function. - queue := make([]*streamSorterSubmission, 0, 1024) - - // The actual callback function is executed concurrently as it might block longer than expected. A blocking select - // would result in the queue not being populated, effectively blocking the submissionWorker. Thus, the callbackFn is - // started in a goroutine, signaling back its success status via callbackCh. If no callback is active, the channel - // is nil. Furthermore, an exponential backoff for sequentially failing callbacks is in place. - const callbackMaxDelay = 10 * time.Second - var callbackDelay time.Duration - var callbackCh chan bool - callbackFn := func(submission *streamSorterSubmission) { - select { - case <-sorter.ctx.Done(): - return - case <-time.After(callbackDelay): - } - - start := time.Now() - success := sorter.callbackFn(submission.msg, submission.args) - if success { - defer func() { - // Ensure not to panic if the out channel was closed via CloseOutput in the meantime. - if r := recover(); r != nil { - sorter.logger.Error("Recovered from sending submission", zap.Any("recovery", r)) - } - }() - - submission.out <- submission.msg - callbackDelay = 0 - } else { - callbackDelay = min(2*max(time.Millisecond, callbackDelay), callbackMaxDelay) - } - - if sorter.verbose { - sorter.logger.Debugw("Callback finished", - zap.String("id", submission.msg.ID), - zap.Bool("success", success), - zap.Duration("duration", time.Since(start)), - zap.Duration("next-delay", callbackDelay)) - } - - callbackCh <- success - } - - for { - if len(queue) > 0 && callbackCh == nil { - callbackCh = make(chan bool) - go callbackFn(queue[0]) - } - - select { - case <-sorter.ctx.Done(): - return - - case sub, ok := <-in: - if !ok { - return - } - - queue = append(queue, sub) - - if sorter.verbose { - sorter.logger.Debugw("Queue worker received new submission", - zap.Object("submission", sub), - zap.Int("queue-size", len(queue))) - } - - case ch, ok := <-sorter.closeChQueue: - if !ok { - return - } - - queue = slices.DeleteFunc(queue, func(sub *streamSorterSubmission) bool { - return sub.out == ch - }) - - case success := <-callbackCh: - // The len(queue) part is necessary as sorter.closeChQueue might interfere. - if success && len(queue) > 0 { - queue = queue[1:] - } - - close(callbackCh) - callbackCh = nil - - if sorter.verbose && len(queue) == 0 { - sorter.logger.Debug("Queue worker finished processing queue") - } - } - } -} - -// Submit a [redis.XMessage] to the StreamSorter. -// -// After the message was sorted and successfully passed to the callback including the optional args, it will be -// forwarded to the out channel. -// -// This method returns an error for malformed Redis Stream IDs or if the internal submission channel blocks for over a -// second. Usually, this both should not happen. -func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redis.XMessage) error { +// submit a [redis.XMessage] to the StreamSorter. +func (sorter *StreamSorter) submit(msg redis.XMessage, key string, out chan<- redis.XMessage) error { ms, seq, err := parseRedisStreamId(msg.ID) if err != nil { return errors.Wrap(err, "cannot parse Redis Stream ID") @@ -351,7 +314,7 @@ func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redi submission := &streamSorterSubmission{ msg: msg, - args: args, + key: key, out: out, streamIdMs: ms, streamIdSeq: seq, @@ -370,28 +333,64 @@ func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redi } } -// CloseOutput clears all submissions targeting this output channel and closes the channel afterwards. +// PipelineFunc implements the interface expected for a history sync pipeline stage. // -// This will only result in submissions with this out channel to be removed from both the submissionWorker's heap and -// the queueWorker's queue. In case such a submission is already in the actual submission process, it might still be -// tried, but sending it to the out channel is recovered internally. -// -// As filtering/recreating the caches is potentially expensive, only call this method if required. In the current -// architecture of sync.go, this is fine. -func (sorter *StreamSorter) CloseOutput(out chan<- redis.XMessage) error { - for _, ch := range []chan chan<- redis.XMessage{sorter.closeChSubmission, sorter.closeChQueue} { - select { - case <-sorter.ctx.Done(): - return sorter.ctx.Err() +// This method of a single StreamSorter can be inserted into multiple history sync pipelines and will forward all +// messages from in to out as expected from a pipeline stage. In between, all messages are processed by the +// StreamSorter, which correlates the messages from different pipelines and additionally passes them to a callback +// according to its specification (see the comment on the StreamSorter type). +func (sorter *StreamSorter) PipelineFunc( + ctx context.Context, + s Sync, + key string, + in <-chan redis.XMessage, + out chan<- redis.XMessage, +) error { - case ch <- out: + // Register output channel with worker. + select { + case sorter.registerOutCh <- out: + // Success, worker is now responsible for closing the channel. - case <-time.After(time.Second): - return errors.New("sending to channel for closing timed out") - } + case <-ctx.Done(): + close(out) + return ctx.Err() + + case <-sorter.ctx.Done(): + close(out) + return sorter.ctx.Err() } - close(out) + // If we exit, signal to the worker that no more work for this channel will be submitted. + defer func() { + select { + case sorter.closeOutCh <- out: + // Success, worker will close the output channel eventually. - return nil + case <-sorter.ctx.Done(): + // Worker will quit entirely, closing all output channels. + } + }() + + for { + select { + case msg, ok := <-in: + if !ok { + return nil + } + + err := sorter.submit(msg, key, out) + if err != nil { + s.logger.Errorw("Failed to submit Redis stream event to stream sorter", + zap.String("key", key), + zap.Error(err)) + } + + case <-ctx.Done(): + return ctx.Err() + + case <-sorter.ctx.Done(): + return sorter.ctx.Err() + } + } } diff --git a/pkg/icingadb/history/sorter_test.go b/pkg/icingadb/history/sorter_test.go index b6c6fd6b..f1439910 100644 --- a/pkg/icingadb/history/sorter_test.go +++ b/pkg/icingadb/history/sorter_test.go @@ -3,6 +3,7 @@ package history import ( "cmp" + "context" "fmt" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/redis" @@ -192,7 +193,6 @@ func TestStreamSorter(t *testing.T) { producersEarlyClose: 5, callbackMaxDelayMs: 1000, callbackSuccessPercent: 100, - expectTimeout: true, }, { name: "pure chaos", @@ -211,7 +211,7 @@ func TestStreamSorter(t *testing.T) { var ( callbackCollection []string callbackCollectionMutex sync.Mutex - callbackFn = func(msg redis.XMessage, _ any) bool { + callbackFn = func(msg redis.XMessage, _ string) bool { if tt.callbackMaxDelayMs > 0 { time.Sleep(time.Duration(rand.Int63n(int64(tt.callbackMaxDelayMs))) * time.Millisecond) } @@ -260,9 +260,14 @@ func TestStreamSorter(t *testing.T) { for i := range tt.producers { earlyClose := i < tt.producersEarlyClose + in := make(chan redis.XMessage) out := make(chan redis.XMessage) + go func() { + require.NoError(t, sorter.PipelineFunc(context.Background(), Sync{}, "", in, out)) + }() + if !earlyClose { - defer func() { _ = sorter.CloseOutput(out) }() // no leakage, general cleanup + defer close(in) // no leakage, general cleanup } go func() { @@ -295,12 +300,12 @@ func TestStreamSorter(t *testing.T) { } msg := redis.XMessage{ID: fmt.Sprintf("%d-%d", ms, seq)} - require.NoError(t, sorter.Submit(msg, nil, out)) + in <- msg // 25% chance of closing for early closing producers if earlyClose && rand.Int63n(4) == 3 { - require.NoError(t, sorter.CloseOutput(out)) - t.Log("Successfully closed producer early") + close(in) + t.Log("closed producer early") return } } diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 1b248b58..7d7f7dd1 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -449,7 +449,7 @@ func makeSortedCallbackStageFunc( keyStructPtrs map[string]any, fn func(database.Entity) bool, ) stageFunc { - sorterCallbackFn := func(msg redis.XMessage, args any) bool { + sorterCallbackFn := func(msg redis.XMessage, key string) bool { makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) { structPtr, ok := keyStructPtrs[key] if !ok { @@ -473,12 +473,6 @@ func makeSortedCallbackStageFunc( return entity, nil } - key, ok := args.(string) - if !ok { - // Shall not happen; set to string some thirty lines below - panic(fmt.Sprintf("args is of type %T, not string", args)) - } - entity, err := makeEntity(key, msg.Values) if err != nil { logger.Errorw("Failed to create database.Entity out of Redis stream message", @@ -495,36 +489,7 @@ func makeSortedCallbackStageFunc( return success } - sorter := NewStreamSorter(ctx, logger, sorterCallbackFn) - - return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { - defer func() { - if err := sorter.CloseOutput(out); err != nil { - s.logger.Errorw("Closing stream sorter output failed", - zap.String("key", key), - zap.Error(err)) - } - }() - - for { - select { - case msg, ok := <-in: - if !ok { - return nil - } - - err := sorter.Submit(msg, key, out) - if err != nil { - s.logger.Errorw("Failed to submit Redis stream event to stream sorter", - zap.String("key", key), - zap.Error(err)) - } - - case <-ctx.Done(): - return ctx.Err() - } - } - } + return NewStreamSorter(ctx, logger, sorterCallbackFn).PipelineFunc } const (