StreamSorter: improve output channel close behavior and simplify implementation

This commit is pretty much an overhaul of the implementation to allow for a
more straight-forward way to close the output channel. The main changes to the
implementation are:

- StreamSorter now provides a method PipelineFunc that can directly be used in
  a history sync pipeline. This allows StreamSorter to handle the in + out
  stream pair internally, so that it closes out after in was closed and all
  messages from it were passed to out.
- The two worker goroutines were combined into a single one and the secondary
  queue was removed. All pending messages remain in the heap and will only be
  removed from the heap when they are about to be passed to the callback.
- The worker now handles all operations (send and close) on the output stream.
This commit is contained in:
Julian Brost 2025-11-06 09:40:25 +01:00 committed by Alvar Penning
parent 18518cf813
commit 6569487fbb
No known key found for this signature in database
3 changed files with 234 additions and 265 deletions

View file

@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"slices"
"strconv"
"strings"
"time"
@ -44,9 +43,9 @@ func parseRedisStreamId(redisStreamId string) (int64, int64, error) {
// are not precise enough.
type streamSorterSubmission struct {
// msg is the Redis message to be forwarded to out after this submission was sorted.
msg redis.XMessage
args any
out chan<- redis.XMessage
msg redis.XMessage
key string
out chan<- redis.XMessage
// Required for sorting.
streamIdMs int64 // streamIdMs is the Redis Stream ID timestamp part (milliseconds)
@ -55,10 +54,11 @@ type streamSorterSubmission struct {
}
// MarshalLogObject implements [zapcore.ObjectMarshaler].
func (sub streamSorterSubmission) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
func (sub *streamSorterSubmission) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddInt64("redis-id-ms", sub.streamIdMs)
encoder.AddInt64("redis-id-seq", sub.streamIdSeq)
encoder.AddTime("submit-time", sub.submitTime)
encoder.AddString("out", fmt.Sprint(sub.out))
return nil
}
@ -103,26 +103,37 @@ func (subs *streamSorterSubmissions) Pop() any {
return x
}
// StreamSorter accepts multiple [redis.XMessage] via Submit and ejects them in an ordered fashion.
// Peek returns the smallest element from the heap without removing it, or nil if the heap is empty.
func (subs streamSorterSubmissions) Peek() *streamSorterSubmission {
if len(subs) > 0 {
return subs[0]
} else {
return nil
}
}
// StreamSorter is a helper that can used to intercept messages from different history sync pipelines and passes them
// to a callback in the order given by their Redis Stream ID (sorted across all involved streams).
//
// Internally, two goroutines are used. The first one collects the submissions and sorts them into a heap based on the
// Redis Stream ID. After being in the heap for at least three seconds, a submission is forwarded to the other
// goroutine. There, each element is passed to the callback function in order. Only if the callback function has
// succeeded, it is removed from the top of the queue.
// After a message is received, it is kept in a priority queue for three seconds to wait for possible messages from
// another stream with a smaller ID. Thus, if a message is received delayed for more than three seconds, it will be
// relayed out of order. The StreamSorter is only able to ensure order to a certain degree of chaos.
//
// Thus, if a message is received delayed for more than three seconds, it will be relayed out of order. The StreamSorter
// is only able to ensure order to a certain degree of chaos.
//
// The callback function receives the [redis.XMessage] together with generic args passed in Submit for additional
// context. If the callback function returns true, the element will be removed from the queue. Otherwise, the element
// will be kept at top of the queue and retried next time.
// The callback function receives the [redis.XMessage] together with the Redis stream name (key) for additional
// context. The callback function is supposed to return true on success. Otherwise, the callback will be retried until
// it succeeds.
type StreamSorter struct {
ctx context.Context
logger *logging.Logger
callbackFn func(redis.XMessage, any) bool
submissionCh chan *streamSorterSubmission
closeChSubmission chan chan<- redis.XMessage
closeChQueue chan chan<- redis.XMessage
ctx context.Context
logger *logging.Logger
callbackFn func(redis.XMessage, string) bool
submissionCh chan *streamSorterSubmission
// registerOutCh is used by PipelineFunc() to register output channels with worker()
registerOutCh chan chan<- redis.XMessage
// closeOutCh is used by PipelineFunc() to signal to worker() that there will be no more submissions destined for
// that output channel and it can be closed by the worker after it processed all pending submissions for it.
closeOutCh chan chan<- redis.XMessage
// verbose implies a verbose debug logging. Don't think one want to have this outside the tests.
verbose bool
@ -132,218 +143,170 @@ type StreamSorter struct {
func NewStreamSorter(
ctx context.Context,
logger *logging.Logger,
callbackFn func(msg redis.XMessage, args any) bool,
callbackFn func(msg redis.XMessage, key string) bool,
) *StreamSorter {
sorter := &StreamSorter{
ctx: ctx,
logger: logger,
callbackFn: callbackFn,
submissionCh: make(chan *streamSorterSubmission),
closeChSubmission: make(chan chan<- redis.XMessage),
closeChQueue: make(chan chan<- redis.XMessage),
ctx: ctx,
logger: logger,
callbackFn: callbackFn,
submissionCh: make(chan *streamSorterSubmission),
registerOutCh: make(chan chan<- redis.XMessage),
closeOutCh: make(chan chan<- redis.XMessage),
}
_ = context.AfterFunc(ctx, func() {
close(sorter.submissionCh)
close(sorter.closeChSubmission)
close(sorter.closeChQueue)
})
ch := make(chan *streamSorterSubmission)
go sorter.submissionWorker(ch)
go sorter.queueWorker(ch)
go sorter.worker()
return sorter
}
// submissionWorker listens ton submissionCh populated by Submit, fills the heap and ejects streamSorterSubmissions into
// out, linked to the queueWorker goroutine for further processing.
func (sorter *StreamSorter) submissionWorker(out chan<- *streamSorterSubmission) {
defer close(out)
// startCallback initiates the callback in a background goroutine and returns a channel that is closed once the callback
// has succeeded. It retries the callback with a backoff until it signal success by returning true.
func (sorter *StreamSorter) startCallback(msg redis.XMessage, key string) <-chan struct{} {
callbackCh := make(chan struct{})
// When a streamSorterSubmission is created in the Submit method, the current time.Time is added to the struct.
go func() {
defer close(callbackCh)
const callbackMaxDelay = 10 * time.Second
callbackDelay := time.Duration(0)
for {
select {
case <-sorter.ctx.Done():
return
case <-time.After(callbackDelay):
}
start := time.Now()
success := sorter.callbackFn(msg, key)
if sorter.verbose {
sorter.logger.Debugw("Callback finished",
zap.String("id", msg.ID),
zap.Bool("success", success),
zap.Duration("duration", time.Since(start)),
zap.Duration("next-delay", callbackDelay))
}
if success {
return
} else {
callbackDelay = min(2*max(time.Millisecond, callbackDelay), callbackMaxDelay)
}
}
}()
return callbackCh
}
// worker
func (sorter *StreamSorter) worker() {
// When a streamSorterSubmission is created in the submit method, the current time.Time is added to the struct.
// Only if the submission was at least three seconds (submissionMinAge) ago, a popped submission from the heap will
// be forwarded to the other goroutine for future processing.
const submissionMinAge = 3 * time.Second
submissionHeap := &streamSorterSubmissions{}
var submissionHeap streamSorterSubmissions
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
type OutputState struct {
pending int
close bool
}
registeredOutputs := make(map[chan<- redis.XMessage]*OutputState)
// Close all registered outputs when we exit.
defer func() {
for out := range registeredOutputs {
close(out)
}
}()
var runningSubmission *streamSorterSubmission
var runningCallbackCh <-chan struct{}
for {
select {
case <-sorter.ctx.Done():
return
// Sanity check
if (runningSubmission == nil) != (runningCallbackCh == nil) {
panic(fmt.Sprintf("inconsistent state: runningSubmission=%#v and runningCallbackCh=%#v", runningSubmission, runningCallbackCh))
}
case sub, ok := <-sorter.submissionCh:
if !ok {
return
var nextSubmissionDue <-chan time.Time
if runningCallbackCh == nil {
if next := submissionHeap.Peek(); next != nil {
if submissionAge := time.Since(next.submitTime); submissionAge >= submissionMinAge {
runningCallbackCh = sorter.startCallback(next.msg, next.key)
runningSubmission = next
heap.Pop(&submissionHeap)
} else {
nextSubmissionDue = time.After(submissionMinAge - submissionAge)
}
}
}
select {
case out := <-sorter.registerOutCh:
if sorter.verbose {
sorter.logger.Debugw("worker: register output", zap.String("out", fmt.Sprint(out)))
}
if _, ok := registeredOutputs[out]; ok {
panic("attempting to register the same output channel twice")
}
registeredOutputs[out] = &OutputState{}
// This function is now responsible for closing out.
case out := <-sorter.closeOutCh:
if sorter.verbose {
sorter.logger.Debugw("worker: request close output", zap.String("out", fmt.Sprint(out)))
}
if state := registeredOutputs[out]; state == nil {
panic("requested to close unknown output channel")
} else if state.pending > 0 {
// Still pending work, mark the output and wait for it to complete.
state.close = true
} else {
// Output can be closed and unregistered immediately
close(out)
delete(registeredOutputs, out)
}
case sub := <-sorter.submissionCh:
if sorter.verbose {
sorter.logger.Debugw("Push submission to heap", zap.Object("submission", sub))
}
heap.Push(submissionHeap, sub)
case ch, ok := <-sorter.closeChSubmission:
if !ok {
return
if state := registeredOutputs[sub.out]; state == nil {
panic("submission for an unknown output channel")
} else {
state.pending++
heap.Push(&submissionHeap, sub)
}
bkp := &streamSorterSubmissions{}
for submissionHeap.Len() > 0 {
x := heap.Pop(submissionHeap)
sub, ok := x.(*streamSorterSubmission)
if !ok {
panic(fmt.Sprintf("invalid type %T from submission heap", x))
}
case <-nextSubmissionDue:
// Loop start processing of the next submission.
continue
if sub.out == ch {
continue
}
bkp.Push(sub)
}
submissionHeap = bkp
case <-ticker.C:
start := time.Now()
submissionCounter := 0
for submissionHeap.Len() > 0 {
if peek := (*submissionHeap)[0]; time.Since(peek.submitTime) < submissionMinAge {
if sorter.verbose {
sorter.logger.Debugw("Stopped popping heap as submission is not old enough",
zap.Object("submission", peek),
zap.Int("submissions", submissionCounter),
zap.Duration("duration", time.Since(start)))
}
break
}
x := heap.Pop(submissionHeap)
sub, ok := x.(*streamSorterSubmission)
if !ok {
panic(fmt.Sprintf("invalid type %T from submission heap", x))
}
out <- sub
submissionCounter++
case <-runningCallbackCh:
out := runningSubmission.out
out <- runningSubmission.msg
state := registeredOutputs[out]
state.pending--
if state.close && state.pending == 0 {
close(out)
delete(registeredOutputs, out)
}
if sorter.verbose && submissionCounter > 0 {
sorter.logger.Debugw("Ejected submissions to callback worker",
zap.Int("submissions", submissionCounter),
zap.Duration("duration", time.Since(start)))
}
runningCallbackCh = nil
runningSubmission = nil
case <-sorter.ctx.Done():
return
}
}
}
// queueWorker receives sorted streamSorterSubmissions from submissionWorker and forwards them to the callback.
func (sorter *StreamSorter) queueWorker(in <-chan *streamSorterSubmission) {
// Each streamSorterSubmission received from "in" is stored in the queue slice. From there on, the slice head is
// passed to the callback function.
queue := make([]*streamSorterSubmission, 0, 1024)
// The actual callback function is executed concurrently as it might block longer than expected. A blocking select
// would result in the queue not being populated, effectively blocking the submissionWorker. Thus, the callbackFn is
// started in a goroutine, signaling back its success status via callbackCh. If no callback is active, the channel
// is nil. Furthermore, an exponential backoff for sequentially failing callbacks is in place.
const callbackMaxDelay = 10 * time.Second
var callbackDelay time.Duration
var callbackCh chan bool
callbackFn := func(submission *streamSorterSubmission) {
select {
case <-sorter.ctx.Done():
return
case <-time.After(callbackDelay):
}
start := time.Now()
success := sorter.callbackFn(submission.msg, submission.args)
if success {
defer func() {
// Ensure not to panic if the out channel was closed via CloseOutput in the meantime.
if r := recover(); r != nil {
sorter.logger.Error("Recovered from sending submission", zap.Any("recovery", r))
}
}()
submission.out <- submission.msg
callbackDelay = 0
} else {
callbackDelay = min(2*max(time.Millisecond, callbackDelay), callbackMaxDelay)
}
if sorter.verbose {
sorter.logger.Debugw("Callback finished",
zap.String("id", submission.msg.ID),
zap.Bool("success", success),
zap.Duration("duration", time.Since(start)),
zap.Duration("next-delay", callbackDelay))
}
callbackCh <- success
}
for {
if len(queue) > 0 && callbackCh == nil {
callbackCh = make(chan bool)
go callbackFn(queue[0])
}
select {
case <-sorter.ctx.Done():
return
case sub, ok := <-in:
if !ok {
return
}
queue = append(queue, sub)
if sorter.verbose {
sorter.logger.Debugw("Queue worker received new submission",
zap.Object("submission", sub),
zap.Int("queue-size", len(queue)))
}
case ch, ok := <-sorter.closeChQueue:
if !ok {
return
}
queue = slices.DeleteFunc(queue, func(sub *streamSorterSubmission) bool {
return sub.out == ch
})
case success := <-callbackCh:
// The len(queue) part is necessary as sorter.closeChQueue might interfere.
if success && len(queue) > 0 {
queue = queue[1:]
}
close(callbackCh)
callbackCh = nil
if sorter.verbose && len(queue) == 0 {
sorter.logger.Debug("Queue worker finished processing queue")
}
}
}
}
// Submit a [redis.XMessage] to the StreamSorter.
//
// After the message was sorted and successfully passed to the callback including the optional args, it will be
// forwarded to the out channel.
//
// This method returns an error for malformed Redis Stream IDs or if the internal submission channel blocks for over a
// second. Usually, this both should not happen.
func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redis.XMessage) error {
// submit a [redis.XMessage] to the StreamSorter.
func (sorter *StreamSorter) submit(msg redis.XMessage, key string, out chan<- redis.XMessage) error {
ms, seq, err := parseRedisStreamId(msg.ID)
if err != nil {
return errors.Wrap(err, "cannot parse Redis Stream ID")
@ -351,7 +314,7 @@ func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redi
submission := &streamSorterSubmission{
msg: msg,
args: args,
key: key,
out: out,
streamIdMs: ms,
streamIdSeq: seq,
@ -370,28 +333,64 @@ func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redi
}
}
// CloseOutput clears all submissions targeting this output channel and closes the channel afterwards.
// PipelineFunc implements the interface expected for a history sync pipeline stage.
//
// This will only result in submissions with this out channel to be removed from both the submissionWorker's heap and
// the queueWorker's queue. In case such a submission is already in the actual submission process, it might still be
// tried, but sending it to the out channel is recovered internally.
//
// As filtering/recreating the caches is potentially expensive, only call this method if required. In the current
// architecture of sync.go, this is fine.
func (sorter *StreamSorter) CloseOutput(out chan<- redis.XMessage) error {
for _, ch := range []chan chan<- redis.XMessage{sorter.closeChSubmission, sorter.closeChQueue} {
select {
case <-sorter.ctx.Done():
return sorter.ctx.Err()
// This method of a single StreamSorter can be inserted into multiple history sync pipelines and will forward all
// messages from in to out as expected from a pipeline stage. In between, all messages are processed by the
// StreamSorter, which correlates the messages from different pipelines and additionally passes them to a callback
// according to its specification (see the comment on the StreamSorter type).
func (sorter *StreamSorter) PipelineFunc(
ctx context.Context,
s Sync,
key string,
in <-chan redis.XMessage,
out chan<- redis.XMessage,
) error {
case ch <- out:
// Register output channel with worker.
select {
case sorter.registerOutCh <- out:
// Success, worker is now responsible for closing the channel.
case <-time.After(time.Second):
return errors.New("sending to channel for closing timed out")
}
case <-ctx.Done():
close(out)
return ctx.Err()
case <-sorter.ctx.Done():
close(out)
return sorter.ctx.Err()
}
close(out)
// If we exit, signal to the worker that no more work for this channel will be submitted.
defer func() {
select {
case sorter.closeOutCh <- out:
// Success, worker will close the output channel eventually.
return nil
case <-sorter.ctx.Done():
// Worker will quit entirely, closing all output channels.
}
}()
for {
select {
case msg, ok := <-in:
if !ok {
return nil
}
err := sorter.submit(msg, key, out)
if err != nil {
s.logger.Errorw("Failed to submit Redis stream event to stream sorter",
zap.String("key", key),
zap.Error(err))
}
case <-ctx.Done():
return ctx.Err()
case <-sorter.ctx.Done():
return sorter.ctx.Err()
}
}
}

View file

@ -3,6 +3,7 @@ package history
import (
"cmp"
"context"
"fmt"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/redis"
@ -192,7 +193,6 @@ func TestStreamSorter(t *testing.T) {
producersEarlyClose: 5,
callbackMaxDelayMs: 1000,
callbackSuccessPercent: 100,
expectTimeout: true,
},
{
name: "pure chaos",
@ -211,7 +211,7 @@ func TestStreamSorter(t *testing.T) {
var (
callbackCollection []string
callbackCollectionMutex sync.Mutex
callbackFn = func(msg redis.XMessage, _ any) bool {
callbackFn = func(msg redis.XMessage, _ string) bool {
if tt.callbackMaxDelayMs > 0 {
time.Sleep(time.Duration(rand.Int63n(int64(tt.callbackMaxDelayMs))) * time.Millisecond)
}
@ -260,9 +260,14 @@ func TestStreamSorter(t *testing.T) {
for i := range tt.producers {
earlyClose := i < tt.producersEarlyClose
in := make(chan redis.XMessage)
out := make(chan redis.XMessage)
go func() {
require.NoError(t, sorter.PipelineFunc(context.Background(), Sync{}, "", in, out))
}()
if !earlyClose {
defer func() { _ = sorter.CloseOutput(out) }() // no leakage, general cleanup
defer close(in) // no leakage, general cleanup
}
go func() {
@ -295,12 +300,12 @@ func TestStreamSorter(t *testing.T) {
}
msg := redis.XMessage{ID: fmt.Sprintf("%d-%d", ms, seq)}
require.NoError(t, sorter.Submit(msg, nil, out))
in <- msg
// 25% chance of closing for early closing producers
if earlyClose && rand.Int63n(4) == 3 {
require.NoError(t, sorter.CloseOutput(out))
t.Log("Successfully closed producer early")
close(in)
t.Log("closed producer early")
return
}
}

View file

@ -449,7 +449,7 @@ func makeSortedCallbackStageFunc(
keyStructPtrs map[string]any,
fn func(database.Entity) bool,
) stageFunc {
sorterCallbackFn := func(msg redis.XMessage, args any) bool {
sorterCallbackFn := func(msg redis.XMessage, key string) bool {
makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) {
structPtr, ok := keyStructPtrs[key]
if !ok {
@ -473,12 +473,6 @@ func makeSortedCallbackStageFunc(
return entity, nil
}
key, ok := args.(string)
if !ok {
// Shall not happen; set to string some thirty lines below
panic(fmt.Sprintf("args is of type %T, not string", args))
}
entity, err := makeEntity(key, msg.Values)
if err != nil {
logger.Errorw("Failed to create database.Entity out of Redis stream message",
@ -495,36 +489,7 @@ func makeSortedCallbackStageFunc(
return success
}
sorter := NewStreamSorter(ctx, logger, sorterCallbackFn)
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
defer func() {
if err := sorter.CloseOutput(out); err != nil {
s.logger.Errorw("Closing stream sorter output failed",
zap.String("key", key),
zap.Error(err))
}
}()
for {
select {
case msg, ok := <-in:
if !ok {
return nil
}
err := sorter.Submit(msg, key, out)
if err != nil {
s.logger.Errorw("Failed to submit Redis stream event to stream sorter",
zap.String("key", key),
zap.Error(err))
}
case <-ctx.Done():
return ctx.Err()
}
}
}
return NewStreamSorter(ctx, logger, sorterCallbackFn).PipelineFunc
}
const (