mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-03 21:52:13 -04:00
Signed-off-by: bwplotka <bwplotka@gmail.com> # Conflicts: # tsdb/wlog/watcher_test.go # Conflicts: # tsdb/wlog/watcher_test.go # Conflicts: # tsdb/wlog/watcher.go
571 lines
17 KiB
Go
571 lines
17 KiB
Go
// Copyright 2018 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
package wlog
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/util/compression"
|
|
)
|
|
|
|
var (
|
|
defaultRetryInterval = 100 * time.Millisecond
|
|
defaultRetries = 100
|
|
)
|
|
|
|
// retry executes f() n times at each interval until it returns true.
|
|
func retry(t testing.TB, interval time.Duration, n int, f func() bool) {
|
|
t.Helper()
|
|
ticker := time.NewTicker(interval)
|
|
for i := 0; i <= n; i++ {
|
|
if f() {
|
|
return
|
|
}
|
|
<-ticker.C
|
|
}
|
|
ticker.Stop()
|
|
t.Logf("function returned false")
|
|
}
|
|
|
|
type writeToMock struct {
|
|
mu sync.Mutex
|
|
|
|
samplesAppended int
|
|
exemplarsAppended int
|
|
histogramsAppended int
|
|
floatHistogramsAppended int
|
|
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
|
|
|
|
// If nonzero, delay reads with a short sleep.
|
|
delay time.Duration
|
|
}
|
|
|
|
func (wtm *writeToMock) Append(s []record.RefSample) bool {
|
|
time.Sleep(wtm.delay)
|
|
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
wtm.samplesAppended += len(s)
|
|
return true
|
|
}
|
|
|
|
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
|
|
time.Sleep(wtm.delay)
|
|
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
wtm.exemplarsAppended += len(e)
|
|
return true
|
|
}
|
|
|
|
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
|
|
time.Sleep(wtm.delay)
|
|
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
wtm.histogramsAppended += len(h)
|
|
return true
|
|
}
|
|
|
|
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
|
|
time.Sleep(wtm.delay)
|
|
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
wtm.floatHistogramsAppended += len(fh)
|
|
return true
|
|
}
|
|
|
|
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
|
|
time.Sleep(wtm.delay)
|
|
wtm.UpdateSeriesSegment(series, index)
|
|
}
|
|
|
|
func (wtm *writeToMock) StoreMetadata(_ []record.RefMetadata) { /* no-op */ }
|
|
|
|
func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) {
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
for _, s := range series {
|
|
wtm.seriesSegmentIndexes[s.Ref] = index
|
|
}
|
|
}
|
|
|
|
func (wtm *writeToMock) SeriesReset(index int) {
|
|
// Check for series that are in segments older than the checkpoint
|
|
// that were not also present in the checkpoint.
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
for k, v := range wtm.seriesSegmentIndexes {
|
|
if v < index {
|
|
delete(wtm.seriesSegmentIndexes, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (wtm *writeToMock) expectEventually(
|
|
t testing.TB, isRunningFn func() bool,
|
|
series, samples, exemplars, histograms int,
|
|
) {
|
|
t.Helper()
|
|
|
|
retry(t, defaultRetryInterval, defaultRetries, func() bool {
|
|
if !isRunningFn() {
|
|
t.Fatal("watcher prematurely finished")
|
|
}
|
|
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
if len(wtm.seriesSegmentIndexes) != series {
|
|
return false
|
|
}
|
|
if wtm.samplesAppended != samples {
|
|
return false
|
|
}
|
|
if wtm.exemplarsAppended != exemplars {
|
|
return false
|
|
}
|
|
if wtm.histogramsAppended+wtm.floatHistogramsAppended != 2*histograms {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
wtm.mu.Lock()
|
|
defer wtm.mu.Unlock()
|
|
require.Len(t, wtm.seriesSegmentIndexes, series, "did not receive the expected number of series")
|
|
require.Equal(t, samples, wtm.samplesAppended, "did not receive the expected number of samples")
|
|
require.Equal(t, exemplars, wtm.exemplarsAppended, "did not receive the expected number of exemplars")
|
|
require.Equal(t, histograms, wtm.histogramsAppended, "did not receive the expected number of histograms")
|
|
require.Equal(t, histograms, wtm.floatHistogramsAppended, "did not receive the expected number of float histograms")
|
|
}
|
|
|
|
func newWriteToMock(delay time.Duration) *writeToMock {
|
|
return &writeToMock{
|
|
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
|
|
delay: delay,
|
|
}
|
|
}
|
|
|
|
func newTestWatcher(dir string, to WriteTo) *Watcher {
|
|
return NewWatcher(nil, nil, nil, "test", to, dir, true, true, true)
|
|
}
|
|
|
|
func startWatching(t *testing.T, w *Watcher, startTimeFn func() time.Time) (isRunning func() bool) {
|
|
t.Helper()
|
|
|
|
// It's like watcher.Start() but allows setting startTime and report errors to testutil.
|
|
go func() {
|
|
defer close(w.done)
|
|
require.NoError(t, w.Watch(timestamp.FromTime(startTimeFn())))
|
|
}()
|
|
t.Cleanup(w.Stop)
|
|
|
|
return func() bool {
|
|
select {
|
|
case <-w.done:
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
func logTestWALRecords(t *testing.T, ts time.Time, w *WL, seriesRefOffset, seriesCount, samplesCount, histogramsCount, exemplarsCount int) {
|
|
t.Helper()
|
|
|
|
enc := record.Encoder{}
|
|
|
|
for i := 0; i < seriesCount; i++ {
|
|
ref := (seriesRefOffset * seriesCount) + i + 100
|
|
series := enc.Series([]record.RefSeries{
|
|
{
|
|
Ref: chunks.HeadSeriesRef(ref),
|
|
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
|
|
},
|
|
}, nil)
|
|
require.NoError(t, w.Log(series))
|
|
|
|
for j := 0; j < samplesCount; j++ {
|
|
inner := rand.Intn(ref + 1)
|
|
sample := enc.Samples([]record.RefSample{
|
|
{
|
|
Ref: chunks.HeadSeriesRef(inner),
|
|
T: timestamp.FromTime(ts),
|
|
V: float64(i),
|
|
},
|
|
}, nil)
|
|
require.NoError(t, w.Log(sample))
|
|
}
|
|
|
|
for j := 0; j < exemplarsCount; j++ {
|
|
inner := rand.Intn(ref + 1)
|
|
exemplar := enc.Exemplars([]record.RefExemplar{
|
|
{
|
|
Ref: chunks.HeadSeriesRef(inner),
|
|
T: timestamp.FromTime(ts),
|
|
V: float64(i),
|
|
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)),
|
|
},
|
|
}, nil)
|
|
require.NoError(t, w.Log(exemplar))
|
|
}
|
|
|
|
for j := 0; j < histogramsCount; j++ {
|
|
inner := rand.Intn(ref + 1)
|
|
hist := &histogram.Histogram{
|
|
Schema: 2,
|
|
ZeroThreshold: 1e-128,
|
|
ZeroCount: 0,
|
|
Count: 2,
|
|
Sum: 0,
|
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
PositiveBuckets: []int64{int64(i) + 1},
|
|
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
NegativeBuckets: []int64{int64(-i) - 1},
|
|
}
|
|
|
|
histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{
|
|
Ref: chunks.HeadSeriesRef(inner),
|
|
T: timestamp.FromTime(ts),
|
|
H: hist,
|
|
}}, nil)
|
|
require.NoError(t, w.Log(histograms))
|
|
|
|
customBucketHist := &histogram.Histogram{
|
|
Schema: -53,
|
|
ZeroThreshold: 1e-128,
|
|
ZeroCount: 0,
|
|
Count: 2,
|
|
Sum: 0,
|
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
CustomValues: []float64{float64(i) + 2},
|
|
}
|
|
|
|
customBucketHistograms := enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{{
|
|
Ref: chunks.HeadSeriesRef(inner),
|
|
T: timestamp.FromTime(ts),
|
|
H: customBucketHist,
|
|
}}, nil)
|
|
require.NoError(t, w.Log(customBucketHistograms))
|
|
|
|
floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
|
|
Ref: chunks.HeadSeriesRef(inner),
|
|
T: timestamp.FromTime(ts),
|
|
FH: hist.ToFloat(nil),
|
|
}}, nil)
|
|
require.NoError(t, w.Log(floatHistograms))
|
|
|
|
customBucketFloatHistograms := enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{{
|
|
Ref: chunks.HeadSeriesRef(inner),
|
|
T: timestamp.FromTime(ts),
|
|
FH: customBucketHist.ToFloat(nil),
|
|
}}, nil)
|
|
require.NoError(t, w.Log(customBucketFloatHistograms))
|
|
}
|
|
}
|
|
// Add an unknown record type, which should be ignored.
|
|
require.NoError(t, w.Log([]byte{255}))
|
|
}
|
|
|
|
func expectSegments(t *testing.T, wdir string, expected int) {
|
|
t.Helper()
|
|
|
|
first, last, err := Segments(wdir)
|
|
require.NoError(t, err)
|
|
if first == -1 && last == -1 {
|
|
require.Equal(t, expected, 0, "expected different number of segments, got 0")
|
|
return
|
|
}
|
|
require.Equal(t, expected, (last-first)+1, "expected different number of segments, got %v to %v", first, last)
|
|
}
|
|
|
|
func cutNewSegment(tb testing.TB, w *WL) {
|
|
tb.Helper()
|
|
|
|
_, err := w.NextSegment()
|
|
require.NoError(tb, err)
|
|
}
|
|
|
|
// TestWatch_Live starts a watcher on an empty WAL and expects it to follow all
|
|
// the incoming, live data written the multiple segments.
|
|
func TestWatch_Live(t *testing.T) {
|
|
const (
|
|
seriesCount = 10
|
|
samplesCount = 250
|
|
exemplarsCount = 25
|
|
histogramsCount = 50
|
|
segments = 4
|
|
)
|
|
for _, compress := range compression.Types() {
|
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
|
now := time.Now()
|
|
dir := t.TempDir()
|
|
wdir := path.Join(dir, "wal")
|
|
require.NoError(t, os.Mkdir(wdir, 0o777))
|
|
|
|
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, w.Close())
|
|
}()
|
|
|
|
wt := newWriteToMock(0)
|
|
watcher := newTestWatcher(dir, wt)
|
|
// Start time has to be before now to read all samples correctly.
|
|
isRunningFn := startWatching(t, watcher, func() time.Time { return now.Add(-1 * time.Millisecond) })
|
|
|
|
// Write a few segments.
|
|
ts := now
|
|
for i := range segments {
|
|
logTestWALRecords(t, ts, w, i, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
ts = ts.Add(1 * time.Minute)
|
|
if i < segments-1 {
|
|
cutNewSegment(t, w)
|
|
}
|
|
}
|
|
watcher.Notify() // Notify would be used on every WAL commit anyway.
|
|
expectSegments(t, wdir, segments)
|
|
|
|
// Watcher watched from the beginning so expect all data.
|
|
wt.expectEventually(t, isRunningFn,
|
|
seriesCount*segments,
|
|
// Last segment has 2x samples, exemplars and histograms, but due to start time logic
|
|
// we expect only half.
|
|
seriesCount*samplesCount*segments,
|
|
seriesCount*exemplarsCount*segments,
|
|
seriesCount*histogramsCount*2*segments,
|
|
)
|
|
|
|
// Whole test should not wait for emergency read timeout.
|
|
require.Less(t, time.Since(now), watcher.readTimeout)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestWatch_ReplayStartTime starts a watcher on an already filled WAL. We expect it to
|
|
// replay series from all segments and replay data from the last segment, respecting
|
|
// start time.
|
|
//
|
|
// After the replay we also test some further live data read.
|
|
func TestWatch_ReplayStartTime(t *testing.T) {
|
|
const (
|
|
seriesCount = 10
|
|
samplesCount = 250
|
|
exemplarsCount = 25
|
|
histogramsCount = 50
|
|
initialSegments = 4
|
|
)
|
|
for _, compress := range compression.Types() {
|
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
|
now := time.Now()
|
|
dir := t.TempDir()
|
|
wdir := path.Join(dir, "wal")
|
|
require.NoError(t, os.Mkdir(wdir, 0o777))
|
|
|
|
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, w.Close())
|
|
}()
|
|
|
|
// Write a few segments.
|
|
ts := now
|
|
for i := range initialSegments {
|
|
logTestWALRecords(t, ts, w, i, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
ts = ts.Add(1 * time.Minute)
|
|
if i < initialSegments-1 {
|
|
cutNewSegment(t, w)
|
|
}
|
|
}
|
|
expectSegments(t, wdir, initialSegments)
|
|
|
|
// For the last segment, log more entries, with the new timestamp to test start time logic.
|
|
logTestWALRecords(t, ts.Add(1*time.Minute), w, initialSegments+1, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
expectSegments(t, wdir, initialSegments) // Still the same last segment.
|
|
|
|
// Create a watcher that should replay series and the last segment data.
|
|
// Set start time to ts, so we expect only half of the last segment to be replayed.
|
|
wt := newWriteToMock(0)
|
|
watcher := newTestWatcher(dir, wt)
|
|
isRunningFn := startWatching(t, watcher, func() time.Time { return ts })
|
|
|
|
wt.expectEventually(t, isRunningFn,
|
|
seriesCount*(initialSegments+1),
|
|
// Last segment has 2x samples, exemplars and histograms, but due to start time logic
|
|
// we expect only half.
|
|
seriesCount*samplesCount,
|
|
// TODO(bwplotka): Start time does not apply on exemplars, should it?
|
|
seriesCount*exemplarsCount*2,
|
|
seriesCount*histogramsCount*2,
|
|
)
|
|
|
|
// Cut a new segment and log new data.
|
|
cutNewSegment(t, w)
|
|
logTestWALRecords(t, ts.Add(2*time.Minute), w, initialSegments+2, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
watcher.Notify()
|
|
|
|
wt.expectEventually(t, isRunningFn,
|
|
seriesCount*(initialSegments+2),
|
|
seriesCount*samplesCount*2,
|
|
// TODO(bwplotka): Start time does not apply on exemplars, should it?
|
|
seriesCount*exemplarsCount*3,
|
|
seriesCount*histogramsCount*2*2,
|
|
)
|
|
|
|
// Whole test should not wait for emergency read timeout.
|
|
require.Less(t, time.Since(now), watcher.readTimeout)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestWatch_ReplayWithCheckpoint starts a watcher on an already filled WAL with
|
|
// a checkpoint. We expect it to replay series from all checkpoints and segments
|
|
// and replay data from the last segment.
|
|
//
|
|
// After the replay we also test some further live checkpoint.
|
|
func TestWatch_ReplayWithCheckpoint(t *testing.T) {
|
|
const (
|
|
seriesCount = 10
|
|
samplesCount = 250
|
|
exemplarsCount = 25
|
|
histogramsCount = 50
|
|
initialSegments = 5
|
|
)
|
|
for _, compress := range compression.Types() {
|
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
|
now := time.Now()
|
|
dir := t.TempDir()
|
|
wdir := path.Join(dir, "wal")
|
|
require.NoError(t, os.Mkdir(wdir, 0o777))
|
|
|
|
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, w.Close())
|
|
}()
|
|
|
|
// Write a few segments.
|
|
ts := now
|
|
for i := range initialSegments {
|
|
logTestWALRecords(t, ts, w, i, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
ts = ts.Add(1 * time.Minute)
|
|
if i < initialSegments-1 {
|
|
cutNewSegment(t, w)
|
|
}
|
|
}
|
|
expectSegments(t, wdir, initialSegments)
|
|
|
|
_, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0)
|
|
require.NoError(t, err)
|
|
require.NoError(t, w.Truncate(2))
|
|
expectSegments(t, wdir, initialSegments-2) // We should see 3 segment as the first 2 were truncated.
|
|
|
|
// Create a watcher that should replay series and the last segment data.
|
|
wt := newWriteToMock(0)
|
|
watcher := newTestWatcher(dir, wt)
|
|
watcher.checkpointPeriod = 500 * time.Millisecond // Checkpoint period is long-ish (best effort), make our tests faster.
|
|
isRunningFn := startWatching(t, watcher, func() time.Time { return now })
|
|
|
|
wt.expectEventually(t, isRunningFn,
|
|
seriesCount*initialSegments,
|
|
seriesCount*samplesCount,
|
|
// TODO(bwplotka): Start time does not apply on exemplars, should it?
|
|
seriesCount*exemplarsCount,
|
|
seriesCount*histogramsCount*2,
|
|
)
|
|
|
|
// During watcher routine, do another checkpoint which deletes all previous series.
|
|
// Then we truncate. We expect watcher to reset the series eventually (GC routine).
|
|
_, err = Checkpoint(promslog.NewNopLogger(), w, 2, 3, func(chunks.HeadSeriesRef, int) bool { return false }, 0)
|
|
require.NoError(t, err)
|
|
err = w.Truncate(4)
|
|
require.NoError(t, err)
|
|
expectSegments(t, wdir, 1)
|
|
|
|
wt.expectEventually(t, isRunningFn,
|
|
seriesCount*2,
|
|
seriesCount*samplesCount,
|
|
// TODO(bwplotka): Start time does not apply on exemplars, should it?
|
|
seriesCount*exemplarsCount,
|
|
seriesCount*histogramsCount*2,
|
|
)
|
|
|
|
// Whole test should not wait for emergency read timeout.
|
|
require.Less(t, time.Since(now), watcher.readTimeout)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestWatch_EmergencyReadTimeout ensures we will read even if we miss notification.
|
|
func TestWatch_EmergencyReadTimeout(t *testing.T) {
|
|
const (
|
|
seriesCount = 10
|
|
samplesCount = 250
|
|
exemplarsCount = 25
|
|
histogramsCount = 50
|
|
)
|
|
for _, compress := range compression.Types() {
|
|
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
|
now := time.Now()
|
|
dir := t.TempDir()
|
|
wdir := path.Join(dir, "wal")
|
|
require.NoError(t, os.Mkdir(wdir, 0o777))
|
|
|
|
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, w.Close())
|
|
}()
|
|
|
|
// Write to a segment.
|
|
ts := now
|
|
logTestWALRecords(t, ts, w, 0, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
ts = ts.Add(1 * time.Minute)
|
|
|
|
// Start watcher.
|
|
wt := newWriteToMock(0)
|
|
watcher := newTestWatcher(dir, wt)
|
|
watcher.readTimeout = 200 * time.Millisecond // Make our test faster for the expected case.
|
|
|
|
// Start time has to be before now to read all samples correctly.
|
|
isRunningFn := startWatching(t, watcher, func() time.Time { return now.Add(-1 * time.Millisecond) })
|
|
|
|
// Write to the same segment, without notification. This will rely on readTimeout.
|
|
logTestWALRecords(t, ts, w, 1, seriesCount, samplesCount, histogramsCount, exemplarsCount)
|
|
expectSegments(t, wdir, 1)
|
|
|
|
// We expect data from the last segment.
|
|
wt.expectEventually(t, isRunningFn,
|
|
seriesCount*2,
|
|
seriesCount*samplesCount*2,
|
|
seriesCount*exemplarsCount*2,
|
|
seriesCount*histogramsCount*2*2,
|
|
)
|
|
})
|
|
}
|
|
}
|