mirror of
https://github.com/Icinga/icingadb.git
synced 2026-02-19 02:28:28 -05:00
history: StreamSorter for Notifications Callback
The StreamSorter was added to history, allowing to collect messages from multiple Redis streams, sorting them based on the timestamp in the Stream ID, and ejecting them back. This is used for the callback stage, required by Icinga Notification. In the Notification context, an ordered stream is required. Despite my best intention, it felt like I have created an Erlang.
This commit is contained in:
parent
b8e11b390e
commit
4a4792dfee
3 changed files with 691 additions and 99 deletions
321
pkg/icingadb/history/sorter.go
Normal file
321
pkg/icingadb/history/sorter.go
Normal file
|
|
@ -0,0 +1,321 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/icinga/icinga-go-library/logging"
|
||||
"github.com/icinga/icinga-go-library/redis"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"math"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// parseRedisStreamId parses a Redis Stream ID and returns the timestamp in ms and the sequence number, or an error.
|
||||
func parseRedisStreamId(redisStreamId string) (int64, int64, error) {
|
||||
re := regexp.MustCompile(`^(\d+)-(\d+)$`)
|
||||
matches := re.FindStringSubmatch(redisStreamId)
|
||||
if len(matches) != 3 {
|
||||
return 0, 0, errors.Errorf("value %q does not satisfy Redis Stream ID regex", redisStreamId)
|
||||
}
|
||||
|
||||
ms, err := strconv.ParseInt(matches[1], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(
|
||||
err,
|
||||
"timestamp part of the Redis Stream ID %q cannot be parsed to int", redisStreamId)
|
||||
}
|
||||
|
||||
seq, err := strconv.ParseInt(matches[2], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(
|
||||
err,
|
||||
"sequence number of the Redis Stream ID %q cannot be parsed to int", redisStreamId)
|
||||
}
|
||||
|
||||
return ms, seq, nil
|
||||
}
|
||||
|
||||
// streamSorterSubmission is one submission to a StreamSorter, allowing to be sorted by the Redis Stream ID - both via
|
||||
// timestamp and the sequence number as a fallback - as well as the submission timestamp for duplicates if milliseconds
|
||||
// are not precise enough.
|
||||
type streamSorterSubmission struct {
|
||||
// msg is the Redis message to be forwarded to out after this submission was sorted.
|
||||
msg redis.XMessage
|
||||
args any
|
||||
out chan<- redis.XMessage
|
||||
|
||||
// Required for sorting.
|
||||
streamIdMs int64 // streamIdMs is the Redis Stream ID timestamp part (milliseconds)
|
||||
streamIdSeq int64 // streamIdSeq is the Redis Stream ID sequence number
|
||||
submitTimeNs int64 // submitTimeNs is the timestamp when the element was submitted (in nanoseconds)
|
||||
}
|
||||
|
||||
// streamSorterSubmissions implements sort.Interface for []streamSorterSubmission.
|
||||
type streamSorterSubmissions []streamSorterSubmission
|
||||
|
||||
func (subs streamSorterSubmissions) Len() int { return len(subs) }
|
||||
|
||||
func (subs streamSorterSubmissions) Swap(i, j int) { subs[i], subs[j] = subs[j], subs[i] }
|
||||
|
||||
func (subs streamSorterSubmissions) Less(i, j int) bool {
|
||||
a, b := subs[i], subs[j]
|
||||
if a.streamIdMs != b.streamIdMs {
|
||||
return a.streamIdMs < b.streamIdMs
|
||||
}
|
||||
if a.streamIdSeq != b.streamIdSeq {
|
||||
return a.streamIdSeq < b.streamIdSeq
|
||||
}
|
||||
return a.submitTimeNs < b.submitTimeNs
|
||||
}
|
||||
|
||||
// StreamSorter accepts multiple [redis.XMessage] via Submit and ejects them in an ordered fashion.
|
||||
//
|
||||
// Internally, two goroutines are used. One collects the submissions and puts them into buckets based on the second
|
||||
// of the Redis Stream ID. After three seconds, each bucket is being sorted and ejected to the other goroutine. There,
|
||||
// each element is passed to the callback function in order. Only if the callback function has succeeded, it is removed
|
||||
// from the top of the queue.
|
||||
//
|
||||
// Thus, if a message is received delayed for more than three seconds, it will be relayed out of order while an error is
|
||||
// being logged. The StreamSorter is only able to ensure order to a certain degree of chaos.
|
||||
//
|
||||
// The callback function receives the [redis.XMessage] together with generic args passed in Submit for additional
|
||||
// context. If the callback function returns true, the element will be removed from the queue. Otherwise, the element
|
||||
// will be kept at top of the queue and retried next time.
|
||||
type StreamSorter struct {
|
||||
logger *logging.Logger
|
||||
callbackFn func(redis.XMessage, any) bool
|
||||
submissionCh chan streamSorterSubmission
|
||||
|
||||
// verbose implies a verbose debug logging. Don't think one want to have this outside the tests.
|
||||
verbose bool
|
||||
}
|
||||
|
||||
// NewStreamSorter creates a StreamSorter honoring the given context and returning elements to the callback function.
|
||||
func NewStreamSorter(
|
||||
ctx context.Context,
|
||||
logger *logging.Logger,
|
||||
callbackFn func(msg redis.XMessage, args any) bool,
|
||||
) *StreamSorter {
|
||||
sorter := &StreamSorter{
|
||||
logger: logger,
|
||||
callbackFn: callbackFn,
|
||||
submissionCh: make(chan streamSorterSubmission),
|
||||
}
|
||||
|
||||
_ = context.AfterFunc(ctx, func() { close(sorter.submissionCh) })
|
||||
|
||||
ch := make(chan []streamSorterSubmission)
|
||||
go sorter.submissionWorker(ctx, ch)
|
||||
go sorter.queueWorker(ctx, ch)
|
||||
|
||||
return sorter
|
||||
}
|
||||
|
||||
// submissionWorker listens ton submissionCh populated by Submit, fills buckets and ejects them into out, linked to
|
||||
// the queueWorker goroutine for further processing.
|
||||
func (sorter *StreamSorter) submissionWorker(ctx context.Context, out chan<- []streamSorterSubmission) {
|
||||
// slots defines how many second slots should be kept for sorting
|
||||
const slots = 3
|
||||
// buckets maps timestamp in seconds to streamSorterSubmissions made within this second
|
||||
buckets := make(map[int64][]streamSorterSubmission)
|
||||
|
||||
defer close(out)
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case submission := <-sorter.submissionCh:
|
||||
curBucketId := time.Now().Unix()
|
||||
bucketId := submission.streamIdMs / 1_000
|
||||
if minBucketId := curBucketId - slots; bucketId < minBucketId {
|
||||
sorter.logger.Errorw("Received message with Stream ID from the far past, put in last bucket",
|
||||
zap.String("id", submission.msg.ID),
|
||||
zap.Int64("buckets-behind", minBucketId-bucketId))
|
||||
bucketId = minBucketId
|
||||
} else if bucketId > curBucketId {
|
||||
sorter.logger.Warnw("Received message with Stream ID from the future",
|
||||
zap.String("id", submission.msg.ID),
|
||||
zap.Int64("buckets-ahead", bucketId-curBucketId))
|
||||
}
|
||||
|
||||
if sorter.verbose {
|
||||
sorter.logger.Debugw("Insert submission into bucket",
|
||||
zap.String("id", submission.msg.ID),
|
||||
zap.Int64("bucket-id", bucketId))
|
||||
}
|
||||
|
||||
bucket, ok := buckets[bucketId]
|
||||
if !ok {
|
||||
bucket = make([]streamSorterSubmission, 0, 1)
|
||||
}
|
||||
buckets[bucketId] = append(bucket, submission)
|
||||
|
||||
case <-ticker.C:
|
||||
// Search the smallest bucket ID older than slots+1 seconds by iterating over the keys. This is fast due to
|
||||
// slots being 3 and the submission code eliminates inserts from the far past. Usually, the latest bucket ID
|
||||
// should be "time.Now().Unix() - slots - 1", but I raced this with a very busy submission channel.
|
||||
bucketId := int64(math.MaxInt64)
|
||||
bucketSup := time.Now().Unix() - slots - 1
|
||||
for k := range buckets {
|
||||
if k <= bucketSup {
|
||||
bucketId = min(bucketId, k)
|
||||
}
|
||||
}
|
||||
|
||||
bucket, ok := buckets[bucketId]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
delete(buckets, bucketId)
|
||||
|
||||
sort.Sort(streamSorterSubmissions(bucket))
|
||||
out <- bucket
|
||||
|
||||
if sorter.verbose {
|
||||
sorter.logger.Debugw("Ejected submission bucket to callback worker",
|
||||
zap.Int64("bucket-id", bucketId),
|
||||
zap.Int("bucket-size", len(bucket)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queueWorker receives sorted streamSorterSubmissions from submissionWorker and forwards them to the callback.
|
||||
func (sorter *StreamSorter) queueWorker(ctx context.Context, in <-chan []streamSorterSubmission) {
|
||||
// Each streamSorterSubmission received bucket-wise from in is stored in the queue slice. From there on, the slice
|
||||
// head is passed to the callback function. The queueEventCh has a buffer capacity of 1 to allow both filling and
|
||||
// consuming in the same goroutine.
|
||||
queue := make([]streamSorterSubmission, 0, 1024)
|
||||
queueEventCh := make(chan struct{}, 1)
|
||||
defer close(queueEventCh)
|
||||
|
||||
// queueEvent places something in queueEventCh w/o deadlocking
|
||||
queueEvent := func() {
|
||||
// Always drain channel first. Consider positive <-queueEventCh case followed by <-in. Within <-in, a second
|
||||
// struct{}{} would be sent, effectively deadlocking.
|
||||
for len(queueEventCh) > 0 {
|
||||
<-queueEventCh
|
||||
}
|
||||
queueEventCh <- struct{}{}
|
||||
}
|
||||
|
||||
// The actual callback function is executed concurrently as it might block longer than expected. A blocking select
|
||||
// would result in the queue not being populated, effectively blocking the submissionWorker. Thus, the callbackFn is
|
||||
// started in a goroutine, signaling back its success status via callbackCh. If no callback is active, the channel
|
||||
// is nil. Furthermore, an exponential backoff for sequentially failing callbacks is in place.
|
||||
const callbackMaxDelay = 10 * time.Second
|
||||
var callbackDelay time.Duration
|
||||
var callbackCh chan bool
|
||||
callbackFn := func(submission streamSorterSubmission) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(callbackDelay):
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
success := sorter.callbackFn(submission.msg, submission.args)
|
||||
if success {
|
||||
submission.out <- submission.msg
|
||||
callbackDelay = 0
|
||||
} else {
|
||||
callbackDelay = min(2*max(time.Millisecond, callbackDelay), callbackMaxDelay)
|
||||
}
|
||||
|
||||
if sorter.verbose {
|
||||
sorter.logger.Debugw("Callback finished",
|
||||
zap.String("id", submission.msg.ID),
|
||||
zap.Bool("success", success),
|
||||
zap.Duration("duration", time.Since(start)),
|
||||
zap.Duration("next-delay", callbackDelay))
|
||||
}
|
||||
|
||||
callbackCh <- success
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case submissions, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
queue = append(queue, submissions...)
|
||||
queueEvent()
|
||||
|
||||
if sorter.verbose {
|
||||
sorter.logger.Debugw("Queue worker received new submissions",
|
||||
zap.Int("queue-size", len(queue)))
|
||||
}
|
||||
|
||||
case <-queueEventCh:
|
||||
if len(queue) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if callbackCh != nil {
|
||||
continue
|
||||
}
|
||||
callbackCh = make(chan bool)
|
||||
go callbackFn(queue[0])
|
||||
|
||||
case success := <-callbackCh:
|
||||
if success {
|
||||
queue = queue[1:]
|
||||
}
|
||||
|
||||
close(callbackCh)
|
||||
callbackCh = nil
|
||||
|
||||
if len(queue) > 0 {
|
||||
queueEvent()
|
||||
} else {
|
||||
if sorter.verbose {
|
||||
sorter.logger.Debug("Queue worker finished processing queue")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Submit a [redis.XMessage] to the StreamSorter.
|
||||
//
|
||||
// After the message was sorted and successfully passed to the callback including the optional args, it will be
|
||||
// forwarded to the out channel.
|
||||
//
|
||||
// This method returns an error for malformed Redis Stream IDs or if the internal submission channel blocks for over a
|
||||
// second. Usually, this both should not happen.
|
||||
func (sorter *StreamSorter) Submit(msg redis.XMessage, args any, out chan<- redis.XMessage) error {
|
||||
ms, seq, err := parseRedisStreamId(msg.ID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot parse Redis Stream ID")
|
||||
}
|
||||
|
||||
submission := streamSorterSubmission{
|
||||
msg: msg,
|
||||
args: args,
|
||||
out: out,
|
||||
streamIdMs: ms,
|
||||
streamIdSeq: seq,
|
||||
submitTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
select {
|
||||
case sorter.submissionCh <- submission:
|
||||
return nil
|
||||
|
||||
case <-time.After(time.Second):
|
||||
return errors.New("submission timed out")
|
||||
}
|
||||
}
|
||||
317
pkg/icingadb/history/sorter_test.go
Normal file
317
pkg/icingadb/history/sorter_test.go
Normal file
|
|
@ -0,0 +1,317 @@
|
|||
// #nosec G404 -- Allow math/rand for the tests
|
||||
package history
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"fmt"
|
||||
"github.com/icinga/icinga-go-library/logging"
|
||||
"github.com/icinga/icinga-go-library/redis"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Test_redisStreamIdToMs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
wantMs int64
|
||||
wantSeq int64
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "epoch",
|
||||
input: "0-0",
|
||||
},
|
||||
{
|
||||
name: "valid",
|
||||
input: "1761658169701-0",
|
||||
wantMs: 1761658169701,
|
||||
},
|
||||
{
|
||||
name: "valid sequence",
|
||||
input: "1761658169701-23",
|
||||
wantMs: 1761658169701,
|
||||
wantSeq: 23,
|
||||
},
|
||||
{
|
||||
name: "invalid format",
|
||||
input: "23-42-23",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid field types",
|
||||
input: "0x23-0x42",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "number too big",
|
||||
input: "22222222222222222222222222222222222222222222222222222222222222222222222222222222222222222-0",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotMs, gotSeq, err := parseRedisStreamId(tt.input)
|
||||
require.Equal(t, tt.wantErr, err != nil, "error differs %v", err)
|
||||
require.Equal(t, tt.wantMs, gotMs, "ms from Redis Stream ID differs")
|
||||
require.Equal(t, tt.wantSeq, gotSeq, "seq from Redis Stream ID differs")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_streamSorterSubmissions(t *testing.T) {
|
||||
submissions := []streamSorterSubmission{
|
||||
{streamIdMs: 0, streamIdSeq: 0, submitTimeNs: 0},
|
||||
{streamIdMs: 1, streamIdSeq: 0, submitTimeNs: 0},
|
||||
{streamIdMs: 1, streamIdSeq: 1, submitTimeNs: 0},
|
||||
{streamIdMs: 2, streamIdSeq: 0, submitTimeNs: 0},
|
||||
{streamIdMs: 2, streamIdSeq: 0, submitTimeNs: 1},
|
||||
{streamIdMs: 3, streamIdSeq: 0, submitTimeNs: 0},
|
||||
{streamIdMs: 3, streamIdSeq: 1, submitTimeNs: 0},
|
||||
{streamIdMs: 3, streamIdSeq: 1, submitTimeNs: 1},
|
||||
{streamIdMs: 3, streamIdSeq: 1, submitTimeNs: 2},
|
||||
}
|
||||
|
||||
submissionsRand := make([]streamSorterSubmission, 0, len(submissions))
|
||||
for _, i := range rand.Perm(len(submissions)) {
|
||||
submissionsRand = append(submissionsRand, submissions[i])
|
||||
}
|
||||
|
||||
sort.Sort(streamSorterSubmissions(submissionsRand))
|
||||
require.Equal(t, submissions, submissionsRand)
|
||||
}
|
||||
|
||||
func TestStreamSorter(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
messages int
|
||||
producers int
|
||||
callbackMaxDelayMs int
|
||||
callbackSuccessPercent int
|
||||
expectTimeout bool
|
||||
outMaxDelayMs int
|
||||
}{
|
||||
{
|
||||
name: "baseline",
|
||||
messages: 10,
|
||||
producers: 1,
|
||||
callbackSuccessPercent: 100,
|
||||
},
|
||||
{
|
||||
name: "simple",
|
||||
messages: 100,
|
||||
producers: 10,
|
||||
callbackSuccessPercent: 100,
|
||||
},
|
||||
{
|
||||
name: "many producers",
|
||||
messages: 100,
|
||||
producers: 100,
|
||||
callbackSuccessPercent: 100,
|
||||
},
|
||||
{
|
||||
name: "many messages",
|
||||
messages: 1000,
|
||||
producers: 10,
|
||||
callbackSuccessPercent: 100,
|
||||
},
|
||||
{
|
||||
name: "callback a bit unreliable",
|
||||
messages: 50,
|
||||
producers: 10,
|
||||
callbackSuccessPercent: 70,
|
||||
},
|
||||
{
|
||||
name: "callback coin flip",
|
||||
messages: 50,
|
||||
producers: 10,
|
||||
callbackSuccessPercent: 50,
|
||||
},
|
||||
{
|
||||
name: "callback unreliable",
|
||||
messages: 25,
|
||||
producers: 5,
|
||||
callbackSuccessPercent: 30,
|
||||
},
|
||||
{
|
||||
name: "callback total rejection",
|
||||
messages: 10,
|
||||
producers: 1,
|
||||
callbackSuccessPercent: 0,
|
||||
expectTimeout: true,
|
||||
},
|
||||
{
|
||||
name: "callback slow",
|
||||
messages: 100,
|
||||
producers: 10,
|
||||
callbackMaxDelayMs: 3000,
|
||||
callbackSuccessPercent: 100,
|
||||
},
|
||||
{
|
||||
name: "out slow",
|
||||
messages: 100,
|
||||
producers: 10,
|
||||
callbackSuccessPercent: 100,
|
||||
outMaxDelayMs: 1000,
|
||||
},
|
||||
{
|
||||
name: "pure chaos",
|
||||
messages: 50,
|
||||
producers: 10,
|
||||
callbackMaxDelayMs: 3000,
|
||||
callbackSuccessPercent: 50,
|
||||
outMaxDelayMs: 1000,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Callback functions after reordering
|
||||
var (
|
||||
callbackCollection []string
|
||||
callbackCollectionMutex sync.Mutex
|
||||
callbackFn = func(msg redis.XMessage, _ any) bool {
|
||||
if tt.callbackMaxDelayMs > 0 {
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(tt.callbackMaxDelayMs))) * time.Millisecond)
|
||||
}
|
||||
|
||||
if rand.Int63n(100)+1 > int64(tt.callbackSuccessPercent) {
|
||||
return false
|
||||
}
|
||||
|
||||
callbackCollectionMutex.Lock()
|
||||
defer callbackCollectionMutex.Unlock()
|
||||
callbackCollection = append(callbackCollection, msg.ID)
|
||||
return true
|
||||
}
|
||||
)
|
||||
|
||||
// Out channel after reordering and callback
|
||||
var (
|
||||
outCounterCh = make(chan struct{})
|
||||
outConsumer = func(out chan redis.XMessage) {
|
||||
for {
|
||||
if tt.outMaxDelayMs > 0 {
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(tt.outMaxDelayMs))) * time.Millisecond)
|
||||
}
|
||||
|
||||
_, ok := <-out
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
outCounterCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
// Decreasing counter for messages to be sent
|
||||
var (
|
||||
inCounter = tt.messages
|
||||
inCounterMutex sync.Mutex
|
||||
)
|
||||
|
||||
sorter := NewStreamSorter(
|
||||
t.Context(),
|
||||
logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second),
|
||||
callbackFn)
|
||||
sorter.verbose = true
|
||||
|
||||
for range tt.producers {
|
||||
out := make(chan redis.XMessage)
|
||||
defer close(out) // no leakage, general cleanup after finishing test run
|
||||
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Duration(rand.Int63n(250)) * time.Millisecond)
|
||||
|
||||
inCounterMutex.Lock()
|
||||
isFin := inCounter <= 0
|
||||
if !isFin {
|
||||
inCounter--
|
||||
}
|
||||
inCounterMutex.Unlock()
|
||||
|
||||
if isFin {
|
||||
return
|
||||
}
|
||||
|
||||
ms := time.Now().UnixMilli() + rand.Int63n(2_000) - 1_000
|
||||
seq := rand.Int63n(100)
|
||||
|
||||
// Add 10% time travelers
|
||||
if rand.Int63n(10) == 9 {
|
||||
distanceMs := int64(1_500)
|
||||
if rand.Int63n(2) > 0 {
|
||||
// Don't go back too far. Otherwise, elements would be out of order. Three seconds max.
|
||||
ms -= distanceMs
|
||||
} else {
|
||||
ms += distanceMs
|
||||
}
|
||||
}
|
||||
|
||||
msg := redis.XMessage{ID: fmt.Sprintf("%d-%d", ms, seq)}
|
||||
require.NoError(t, sorter.Submit(msg, nil, out))
|
||||
}
|
||||
}()
|
||||
|
||||
go outConsumer(out)
|
||||
}
|
||||
|
||||
var outCounter int
|
||||
breakFor:
|
||||
for {
|
||||
select {
|
||||
case <-outCounterCh:
|
||||
outCounter++
|
||||
if outCounter == tt.messages {
|
||||
break breakFor
|
||||
}
|
||||
|
||||
case <-time.After(2 * time.Minute):
|
||||
if tt.expectTimeout {
|
||||
return
|
||||
}
|
||||
t.Fatalf("Collecting messages timed out after receiving %d out of %d messages",
|
||||
outCounter, tt.messages)
|
||||
}
|
||||
}
|
||||
if tt.expectTimeout {
|
||||
t.Fatal("Timeout was expected")
|
||||
}
|
||||
|
||||
callbackCollectionMutex.Lock()
|
||||
for i := 0; i < len(callbackCollection)-1; i++ {
|
||||
parse := func(id string) (int64, int64) {
|
||||
parts := strings.Split(id, "-")
|
||||
ms, err1 := strconv.ParseInt(parts[0], 10, 64)
|
||||
seq, err2 := strconv.ParseInt(parts[1], 10, 64)
|
||||
require.NoError(t, cmp.Or(err1, err2))
|
||||
return ms, seq
|
||||
}
|
||||
|
||||
a, b := callbackCollection[i], callbackCollection[i+1]
|
||||
aMs, aSeq := parse(a)
|
||||
bMs, bSeq := parse(b)
|
||||
|
||||
switch {
|
||||
case aMs < bMs:
|
||||
case aMs == bMs:
|
||||
if aSeq > bSeq {
|
||||
t.Errorf("collection in wrong order: %q before %q", a, b)
|
||||
}
|
||||
case aMs > bMs:
|
||||
t.Errorf("collection in wrong order: %q before %q", a, b)
|
||||
}
|
||||
}
|
||||
callbackCollectionMutex.Unlock()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -21,7 +21,6 @@ import (
|
|||
"reflect"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sync specifies the source and destination of a history sync.
|
||||
|
|
@ -61,6 +60,11 @@ func (s Sync) Sync(
|
|||
return fmt.Errorf("if callbackKeyStructPtr and callbackFn are set, a callbackName is required")
|
||||
}
|
||||
|
||||
var callbackStageFn stageFunc
|
||||
if callbackKeyStructPtr != nil {
|
||||
callbackStageFn = makeSortedCallbackStageFunc(ctx, s.logger, callbackName, callbackKeyStructPtr, callbackFn)
|
||||
}
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for key, pipeline := range syncPipelines {
|
||||
|
|
@ -98,7 +102,7 @@ func (s Sync) Sync(
|
|||
// Shadowed variable to allow appending custom callbacks.
|
||||
pipeline := pipeline
|
||||
if hasCallbackStage {
|
||||
pipeline = append(slices.Clip(pipeline), makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn))
|
||||
pipeline = append(slices.Clip(pipeline), callbackStageFn)
|
||||
}
|
||||
|
||||
ch := make([]chan redis.XMessage, len(pipeline)+1)
|
||||
|
|
@ -422,39 +426,40 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM
|
|||
}
|
||||
}
|
||||
|
||||
// makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message.
|
||||
// makeSortedCallbackStageFunc creates a new stageFunc calling the callback function after reordering messages.
|
||||
//
|
||||
// This stageFunc is designed to be used by multiple channels. The internal sorting logic - realized by a StreamSorter -
|
||||
// results in all messages to be sorted based on their Redis Stream ID and be ejected to the callback function in this
|
||||
// order.
|
||||
//
|
||||
// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key.
|
||||
//
|
||||
// The callback call is blocking and the message will be forwarded to the out channel after the function has returned.
|
||||
// Thus, please ensure this function does not block too long.
|
||||
//
|
||||
// If the callback function returns false, the stageFunc switches to a backlog mode, retrying the failed messages and
|
||||
// every subsequent message until there are no messages left. Only after a message was successfully handled by the
|
||||
// callback method, it will be forwarded to the out channel. Thus, this stage might "block" or "hold back" certain
|
||||
// messages during unhappy callback times.
|
||||
// If the callback function returns false, the message will be retried after an increasing backoff. All subsequent
|
||||
// messages will wait until this one succeeds.
|
||||
//
|
||||
// For each successfully submitted message, the telemetry stat named after this callback is incremented. Thus, a delta
|
||||
// between [telemetry.StatHistory] and this stat indicates blocking callbacks.
|
||||
func makeCallbackStageFunc(
|
||||
func makeSortedCallbackStageFunc(
|
||||
ctx context.Context,
|
||||
logger *logging.Logger,
|
||||
name string,
|
||||
keyStructPtrs map[string]any,
|
||||
fn func(database.Entity) bool,
|
||||
) stageFunc {
|
||||
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
|
||||
defer close(out)
|
||||
sorterCallbackFn := func(msg redis.XMessage, args any) bool {
|
||||
makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) {
|
||||
structPtr, ok := keyStructPtrs[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("key is not part of keyStructPtrs")
|
||||
}
|
||||
|
||||
structPtr, ok := keyStructPtrs[key]
|
||||
if !ok {
|
||||
return fmt.Errorf("can't lookup struct pointer for key %q", key)
|
||||
}
|
||||
|
||||
structifier := structify.MakeMapStructifier(
|
||||
reflect.TypeOf(structPtr).Elem(),
|
||||
"json",
|
||||
contracts.SafeInit)
|
||||
|
||||
makeEntity := func(values map[string]interface{}) (database.Entity, error) {
|
||||
structifier := structify.MakeMapStructifier(
|
||||
reflect.TypeOf(structPtr).Elem(),
|
||||
"json",
|
||||
contracts.SafeInit)
|
||||
val, err := structifier(values)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "can't structify values %#v for %q", values, key)
|
||||
|
|
@ -468,13 +473,32 @@ func makeCallbackStageFunc(
|
|||
return entity, nil
|
||||
}
|
||||
|
||||
backlogLastId := ""
|
||||
backlogMsgCounter := 0
|
||||
key, ok := args.(string)
|
||||
if !ok {
|
||||
// Shall not happen; set to string some thirty lines below
|
||||
panic(fmt.Sprintf("args is of type %T, not string", args))
|
||||
}
|
||||
|
||||
const backlogTimerMinInterval, backlogTimerMaxInterval = time.Millisecond, time.Minute
|
||||
backlogTimerInterval := backlogTimerMinInterval
|
||||
backlogTimer := time.NewTimer(backlogTimerInterval)
|
||||
_ = backlogTimer.Stop()
|
||||
entity, err := makeEntity(key, msg.Values)
|
||||
if err != nil {
|
||||
logger.Errorw("Failed to create database.Entity out of Redis stream message",
|
||||
zap.Error(err),
|
||||
zap.String("key", key),
|
||||
zap.String("id", msg.ID))
|
||||
return false
|
||||
}
|
||||
|
||||
success := fn(entity)
|
||||
if success {
|
||||
telemetry.Stats.Get(name).Add(1)
|
||||
}
|
||||
return success
|
||||
}
|
||||
|
||||
sorter := NewStreamSorter(ctx, logger, sorterCallbackFn)
|
||||
|
||||
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
|
||||
defer close(out)
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
@ -483,79 +507,9 @@ func makeCallbackStageFunc(
|
|||
return nil
|
||||
}
|
||||
|
||||
// Only submit the entity directly if there is no backlog.
|
||||
// The second check covers a potential corner case if the XRANGE below races this stream.
|
||||
if backlogLastId != "" && backlogLastId != msg.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
entity, err := makeEntity(msg.Values)
|
||||
err := sorter.Submit(msg, key, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if fn(entity) {
|
||||
out <- msg
|
||||
telemetry.Stats.Get(name).Add(1)
|
||||
backlogLastId = ""
|
||||
} else {
|
||||
backlogLastId = msg.ID
|
||||
backlogMsgCounter = 0
|
||||
backlogTimerInterval = backlogTimerMinInterval
|
||||
_ = backlogTimer.Reset(backlogTimerInterval)
|
||||
s.logger.Warnw("Failed to submit entity to callback, entering into backlog",
|
||||
zap.String("key", key),
|
||||
zap.String("id", backlogLastId))
|
||||
}
|
||||
|
||||
case <-backlogTimer.C:
|
||||
if backlogLastId == "" { // Should never happen.
|
||||
return fmt.Errorf("backlog timer logic for %q was called while backlogLastId was empty", key)
|
||||
}
|
||||
|
||||
logger := s.logger.With(
|
||||
zap.String("key", key),
|
||||
zap.String("last-id", backlogLastId))
|
||||
|
||||
logger.Debug("Trying to advance backlog of callback elements")
|
||||
|
||||
xrangeCmd := s.redis.XRangeN(ctx, "icinga:history:stream:"+key, backlogLastId, "+", 2)
|
||||
msgs, err := xrangeCmd.Result()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "XRANGE %q to %q on stream %q failed", backlogLastId, "+", key)
|
||||
}
|
||||
|
||||
if len(msgs) < 1 || len(msgs) > 2 {
|
||||
return fmt.Errorf("XRANGE %q to %q on stream %q returned %d messages, not 1 or 2",
|
||||
backlogLastId, "+", key, len(msgs))
|
||||
}
|
||||
|
||||
msg := msgs[0]
|
||||
entity, err := makeEntity(msg.Values)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "can't structify backlog value %q for %q", backlogLastId, key)
|
||||
}
|
||||
|
||||
if fn(entity) {
|
||||
out <- msg
|
||||
backlogMsgCounter++
|
||||
telemetry.Stats.Get(name).Add(1)
|
||||
|
||||
if len(msgs) == 1 {
|
||||
backlogLastId = ""
|
||||
logger.Infow("Finished rolling back backlog of callback elements", zap.Int("elements", backlogMsgCounter))
|
||||
} else {
|
||||
backlogLastId = msgs[1].ID
|
||||
backlogTimerInterval = backlogTimerMinInterval
|
||||
_ = backlogTimer.Reset(backlogTimerInterval)
|
||||
logger.Debugw("Advanced backlog",
|
||||
zap.String("new-last-id", backlogLastId),
|
||||
zap.Duration("delay", backlogTimerInterval))
|
||||
}
|
||||
} else {
|
||||
backlogTimerInterval = min(backlogTimerMaxInterval, backlogTimerInterval*2)
|
||||
_ = backlogTimer.Reset(backlogTimerInterval)
|
||||
logger.Warnw("Failed to roll back callback elements", zap.Duration("delay", backlogTimerInterval))
|
||||
s.logger.Errorw("Failed to submit Redis stream event to stream sorter", zap.Error(err))
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
|
|
|
|||
Loading…
Reference in a new issue