wlog: Optimized and refactored watcher code.

Signed-off-by: bwplotka <bwplotka@gmail.com>

# Conflicts:
#	tsdb/wlog/watcher_test.go

# Conflicts:
#	tsdb/wlog/watcher_test.go

# Conflicts:
#	tsdb/wlog/watcher.go
This commit is contained in:
bwplotka 2025-03-04 13:28:45 +00:00
parent 30d04792ca
commit 2df21da1a3
6 changed files with 875 additions and 966 deletions

View file

@ -17,12 +17,9 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"os"
"path"
"runtime/pprof"
"sort"
"strconv"
"strings"
"sync"
@ -50,7 +47,6 @@ import (
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -1396,45 +1392,6 @@ func BenchmarkStoreSeries(b *testing.B) {
}
}
func BenchmarkStartup(b *testing.B) {
dir := os.Getenv("WALDIR")
if dir == "" {
b.Skip("WALDIR env var not set")
}
// Find the second largest segment; we will replay up to this.
// (Second largest as WALWatcher will start tailing the largest).
dirents, err := os.ReadDir(path.Join(dir, "wal"))
require.NoError(b, err)
var segments []int
for _, dirent := range dirents {
if i, err := strconv.Atoi(dirent.Name()); err == nil {
segments = append(segments, i)
}
}
sort.Ints(segments)
logger := promslog.New(&promslog.Config{})
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "")
watcherMetrics := wlog.NewWatcherMetrics(nil)
c := NewTestBlockedWriteClient()
// todo: test with new proto type(s)
m := NewQueueManager(metrics, watcherMetrics, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
m.watcher.SetMetrics()
err := m.watcher.Run()
require.NoError(b, err)
}
}
func TestProcessExternalLabels(t *testing.T) {
b := labels.NewBuilder(labels.EmptyLabels())
for i, tc := range []struct {

View file

@ -45,7 +45,6 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics {
if reg != nil {
reg.MustRegister(m.readerCorruptionErrors)
}
return m
}
@ -83,9 +82,6 @@ type LiveReader struct {
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
// For testing, we can treat EOF as a non-error.
eofNonErr bool
// We sometime see records span page boundaries. Should never happen, but it
// does. Until we track down why, set permissive to true to tolerate it.
// NB the non-ive Reader implementation allows for this.
@ -94,18 +90,19 @@ type LiveReader struct {
metrics *LiveReaderMetrics
}
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
// and Next can be tried again. Non-EOFs are terminal, and the reader should
// not be used again. It is up to the user to decide when to stop trying should
// io.EOF be returned.
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
// and Next can be tried again. Note that LiveReader don't know when we read the
// segment fully, so it will never return nil error.
//
// See handleFullSegmentPartialReads on one way of handling full segments with
// live reader.
//
// Non-EOFs are terminal, and the reader should not be used again.
func (r *LiveReader) Err() error {
if r.eofNonErr && errors.Is(r.err, io.EOF) {
return nil
}
return r.err
}
// Offset returns the number of bytes consumed from this segment.
// Offset returns the number of bytes consumed from the reader interface.
func (r *LiveReader) Offset() int64 {
return r.total
}
@ -117,7 +114,7 @@ func (r *LiveReader) fillBuffer() (int, error) {
}
// Next returns true if Record() will contain a full record.
// If Next returns false, you should always checked the contents of Error().
// If Next returns false, you should always check the contents of Error().
// Return false guarantees there are no more records if the segment is closed
// and not corrupt, otherwise if Err() == io.EOF you should try again when more
// data has been written.

View file

@ -18,6 +18,7 @@ import (
"bytes"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
@ -54,7 +55,6 @@ var readerConstructors = map[string]func(io.Reader) reader{
},
"LiveReader": func(r io.Reader) reader {
lr := NewLiveReader(promslog.NewNopLogger(), NewLiveReaderMetrics(nil), r)
lr.eofNonErr = true
return lr
},
}
@ -186,7 +186,16 @@ func TestReader(t *testing.T) {
require.Equal(t, c.exp[j], rec, "Bytes within record did not match expected Bytes")
}
if !c.fail {
require.NoError(t, r.Err())
if lr, ok := r.(*LiveReader); ok && errors.Is(r.Err(), io.EOF) {
// Live reader does not know if the EOF is because of partial
// segment or full segment. Handle it like all users are supposed to
// handle it.
require.NoError(t, handleFullSegmentPartialReads(r.Err(), lr, func() (int64, error) {
return int64(len(buf)), nil
}))
} else {
require.NoError(t, r.Err())
}
} else {
require.Error(t, r.Err())
}

View file

@ -18,7 +18,6 @@ import (
"fmt"
"io"
"log/slog"
"math"
"os"
"path/filepath"
"strconv"
@ -26,6 +25,7 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/model/labels"
@ -34,36 +34,47 @@ import (
)
const (
checkpointPeriod = 5 * time.Second
// TODO(bwplotka): Checking every 100ms feels too frequent. It might be enough
// to check on notify AND with emergency 15s read only.
segmentCheckPeriod = 100 * time.Millisecond
consumer = "consumer"
)
var (
ErrIgnorable = errors.New("ignore me")
readTimeout = 15 * time.Second
)
// WriteTo is an interface used by the Watcher to send the samples it's read
// from the WAL on to somewhere else. Functions will be called concurrently
// and it is left to the implementer to make sure they are safe.
// from the WAL on to somewhere else. Methods must be concurrency safe.
//
// All record.Ref* slices are only valid until each method finished, implementers
// must not try to reuse the underlying arrays.
type WriteTo interface {
// Append and AppendExemplar should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to it's final destination.
// Append should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to its final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
Append([]record.RefSample) bool
// AppendExemplars should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to its final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
AppendExemplars([]record.RefExemplar) bool
// AppendHistograms should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to its final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
AppendHistograms([]record.RefHistogramSample) bool
// AppendFloatHistograms should block until the samples are fully accepted,
// whether enqueued in memory or successfully written to its final destination.
// Once returned, the WAL Watcher will not attempt to pass that data again.
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
StoreSeries([]record.RefSeries, int)
StoreMetadata([]record.RefMetadata)
// UpdateSeriesSegment and SeriesReset are intended for
// garbage-collection:
// First we call UpdateSeriesSegment on all current series.
// UpdateSeriesSegment is intended for GC.
// First we call UpdateSeriesSegment on all the current series, then SeriesReset
// is called to allow the deletion of all series created in a segment lower
// than the argument.
UpdateSeriesSegment([]record.RefSeries, int)
// Then SeriesReset is called to allow the deletion of all series
// created in a segment lower than the argument.
// SeriesReset is intended for GC.
// First we call UpdateSeriesSegment on all the current series, then SeriesReset
// is called to allow the deletion of all series created in a segment lower
// than the argument.
SeriesReset(int)
}
@ -72,6 +83,7 @@ type WriteNotified interface {
Notify()
}
// WatcherMetrics allows sharing metrics across multiple watcher instances.
type WatcherMetrics struct {
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
@ -80,23 +92,23 @@ type WatcherMetrics struct {
notificationsSkipped *prometheus.CounterVec
}
// Watcher watches the TSDB WAL for a given WriteTo.
// Watcher watches the TSDB WAL and writes the data to a given WriteTo.
// See Start and Watch for details.
type Watcher struct {
name string
writer WriteTo
logger *slog.Logger
walDir string
lastCheckpoint string
name string
writer WriteTo
logger *slog.Logger
walDir string
sendExemplars bool
sendHistograms bool
sendMetadata bool
metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics
replayDone bool
startTime time.Time
startTimestamp int64 // the start time as a Prometheus timestamp
sendSamples bool
lastCheckpoint string
metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics
recordsReadMetric *prometheus.CounterVec
recordDecodeFailsMetric prometheus.Counter
samplesSentPreTailing prometheus.Counter
@ -107,13 +119,13 @@ type Watcher struct {
quit chan struct{}
done chan struct{}
// For testing, stop when we hit this segment.
MaxSegment int
checkpointPeriod time.Duration
readTimeout time.Duration
}
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
m := &WatcherMetrics{
recordsRead: prometheus.NewCounterVec(
recordsRead: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
@ -122,7 +134,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
},
[]string{consumer, "type"},
),
recordDecodeFails: prometheus.NewCounterVec(
recordDecodeFails: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
@ -131,7 +143,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
},
[]string{consumer},
),
samplesSentPreTailing: prometheus.NewCounterVec(
samplesSentPreTailing: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
@ -140,7 +152,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
},
[]string{consumer},
),
currentSegment: prometheus.NewGaugeVec(
currentSegment: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
@ -149,7 +161,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
},
[]string{consumer},
),
notificationsSkipped: prometheus.NewCounterVec(
notificationsSkipped: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
@ -159,23 +171,28 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
[]string{consumer},
),
}
if reg != nil {
reg.MustRegister(m.recordsRead)
reg.MustRegister(m.recordDecodeFails)
reg.MustRegister(m.samplesSentPreTailing)
reg.MustRegister(m.currentSegment)
reg.MustRegister(m.notificationsSkipped)
}
return m
}
// NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger *slog.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher {
// NewWatcher creates a new WAL watcher that watches the for a given WriteTo.
func NewWatcher(
metrics *WatcherMetrics,
readerMetrics *LiveReaderMetrics,
logger *slog.Logger,
name string,
writer WriteTo,
dir string,
sendExemplars, sendHistograms, sendMetadata bool,
) *Watcher {
if logger == nil {
logger = promslog.NewNopLogger()
}
if metrics == nil {
metrics = NewWatcherMetrics(nil)
}
if readerMetrics == nil {
readerMetrics = NewLiveReaderMetrics(nil)
}
return &Watcher{
logger: logger,
writer: writer,
@ -187,11 +204,11 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
sendHistograms: sendHistograms,
sendMetadata: sendMetadata,
readNotify: make(chan struct{}),
quit: make(chan struct{}),
done: make(chan struct{}),
MaxSegment: -1,
readNotify: make(chan struct{}),
quit: make(chan struct{}),
done: make(chan struct{}),
checkpointPeriod: 5 * time.Second,
readTimeout: 15 * time.Second,
}
}
@ -206,84 +223,85 @@ func (w *Watcher) Notify() {
}
}
func (w *Watcher) SetMetrics() {
// Setup the WAL Watchers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's,
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
if w.metrics != nil {
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
}
// startMetrics initialize the mandatory shared watcher metrics.
// We do this here rather than in the constructor because of the ordering of
// creating Queue Managers's, stopping them, and then starting new ones in
// storage/remote/storage.go ApplyConfig.
func (w *Watcher) initMetrics() {
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
}
// Start the Watcher.
func (w *Watcher) Start() {
w.SetMetrics()
w.logger.Info("Starting WAL watcher", "queue", w.name)
go w.loop()
}
// Stop the Watcher.
// Stop the Watcher and waits until it fully stops.
func (w *Watcher) Stop() {
close(w.quit)
<-w.done
// Records read metric has series and samples.
if w.metrics != nil {
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name)
for _, t := range []record.Type{record.Series, record.Samples, record.Tombstones, record.Exemplars, record.MmapMarkers, record.Metadata, record.HistogramSamples, record.FloatHistogramSamples, record.CustomBucketsHistogramSamples, record.CustomBucketsFloatHistogramSamples} {
w.metrics.recordsRead.DeleteLabelValues(w.name, t.String())
}
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name)
w.logger.Info("WAL watcher stopped", "queue", w.name)
}
func (w *Watcher) loop() {
defer close(w.done)
// Start starts the routine that tails the WAL with time.Now start time, until
// the quit channel is closed. If the tailing returns error it retries with the
// error log. Non-series data. Read Watch for tailing logic details.
func (w *Watcher) Start() {
w.initMetrics()
w.logger.Info("Starting WAL watcher", "queue", w.name)
// We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {
w.SetStartTime(time.Now())
if err := w.Run(); err != nil {
w.logger.Error("error tailing WAL", "err", err)
}
go func() {
defer close(w.done)
select {
case <-w.quit:
return
case <-time.After(5 * time.Second):
// We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {
if err := w.Watch(timestamp.FromTime(time.Now())); err != nil {
w.logger.Error("error tailing WAL", "err", err)
}
select {
case <-w.quit:
return
case <-time.After(5 * time.Second):
}
}
}
}()
}
// Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit.
func (w *Watcher) Run() error {
_, lastSegment, err := Segments(w.walDir)
if err != nil {
return fmt.Errorf("Segments: %w", err)
// Watch tails the WAL until the quit channel is closed or an error.
//
// Tailing logic writes the known types of WAL records to WriteTo interface.
// - Series are gathered from all the available WAL segments.
// - Other type of data is gathered only from the last segment and waits for the new data to come in.
// - For samples and histograms startT controls after what timestamp samples should be written to WriteTo.
// This allows retrying watching without reading overlapping data.
func (w *Watcher) Watch(startT int64) error {
if metricsNotInitialized := w.recordsReadMetric == nil; metricsNotInitialized {
w.initMetrics()
}
// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
w.sendSamples = false
_, lastSegment, err := Segments(w.walDir)
if err != nil {
return fmt.Errorf("segments: %w", err)
}
w.replayDone = false
w.logger.Info("Replaying WAL", "queue", w.name)
// Backfill from the checkpoint first if it exists.
// Backfill series from the checkpoint first if it exists.
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
}
if err == nil {
if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil {
if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegmentSeries); err != nil {
return fmt.Errorf("readCheckpoint: %w", err)
}
}
@ -294,25 +312,15 @@ func (w *Watcher) Run() error {
return err
}
w.logger.Debug("Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
w.logger.Debug("Tailing WAL", "lastCheckpoint", strings.TrimPrefix(lastCheckpoint, w.walDir), "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment))
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
w.logger.Debug("Processing segment", "currentSegment", currentSegment)
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
if err := w.watchSegment(startT, currentSegment, currentSegment >= lastSegment); err != nil {
return err
}
// For testing: stop when you hit a specific segment.
if currentSegment == w.MaxSegment {
return nil
}
currentSegment++
}
return nil
}
@ -328,34 +336,15 @@ func (w *Watcher) findSegmentForIndex(index int) (int, error) {
return r.index, nil
}
}
return -1, errors.New("failed to find segment for index")
}
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
err := w.readSegment(r, segmentNum, tail)
// Ignore all errors reading to end of segment whilst replaying the WAL.
if !tail {
if err != nil && !errors.Is(err, io.EOF) {
w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if r.Offset() != size {
w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
}
return ErrIgnorable
}
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
}
// Use tail true to indicate that the reader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
func (w *Watcher) watch(segmentNum int, tail bool) error {
// watchSegment tails a single WAL segment.
// Tail parameter indicates that the reader is currently on a segment that is
// actively being written to and watcher should tail it until the quit channel
// is closed. If false, assume it's a full segment, and we're replaying it only
// to only cache the series records.
func (w *Watcher) watchSegment(startT int64, segmentNum int, tail bool) error {
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
if err != nil {
return err
@ -364,24 +353,28 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
size := int64(math.MaxInt64)
if !tail {
var err error
size, err = getSegmentSize(w.walDir, segmentNum)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
if err := handleFullSegmentPartialReads(w.readSegmentSeries(reader, segmentNum), reader, getSegmentSizeFn(w.walDir, segmentNum)); err != nil {
// Ignore all errors reading to end of segment whilst replaying the WAL.
w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
}
return w.readAndHandleError(reader, segmentNum, tail, size)
return nil
}
checkpointTicker := time.NewTicker(checkpointPeriod)
// Always try to read from the new segment before we wait for emergency read timeout,
// new segments or notifications. EOFs are not fatal as there might be other
// routine writing to a segment.
if err := w.readSegment(reader, startT, segmentNum); err != nil && !errors.Is(err, io.EOF) {
return err
}
checkpointTicker := time.NewTicker(w.checkpointPeriod)
defer checkpointTicker.Stop()
segmentTicker := time.NewTicker(segmentCheckPeriod)
defer segmentTicker.Stop()
readTicker := time.NewTicker(readTimeout)
readTicker := time.NewTicker(w.readTimeout)
defer readTicker.Stop()
gcSem := make(chan struct{}, 1)
@ -410,35 +403,40 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
// Currently doing a garbage collect, try again later.
}
// if a newer segment is produced, read the current one until the end and move on.
// If a newer segment is produced, read the current one until the end and return
// so we can watch the next segment.
case <-segmentTicker.C:
_, last, err := Segments(w.walDir)
if err != nil {
return fmt.Errorf("Segments: %w", err)
return fmt.Errorf("segments: %w", err)
}
if last > segmentNum {
return w.readAndHandleError(reader, segmentNum, tail, size)
// At this point we expect full segment, so handle LiveReader EOF that
// are false (read more in handleFullSegmentPartialReads).
if err := handleFullSegmentPartialReads(w.readSegment(reader, startT, segmentNum), reader, getSegmentSizeFn(w.walDir, segmentNum)); err != nil {
return fmt.Errorf("read on a new segment: %w", err)
}
return nil
}
// No new segments, continue normal flow.
continue
// we haven't read due to a notification in quite some time, try reading anyways
// We haven't read due to a notification in quite some time, try reading anyway.
case <-readTicker.C:
w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", w.readTimeout)
// EOFs are not fatal as there might be other routine writing to a segment.
if err := w.readSegment(reader, startT, segmentNum); err != nil && !errors.Is(err, io.EOF) {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
readTicker.Reset(w.readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
// EOFs are not fatal as there might be other routine writing to a segment.
if err := w.readSegment(reader, startT, segmentNum); err != nil && !errors.Is(err, io.EOF) {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
readTicker.Reset(w.readTimeout)
}
}
}
@ -475,11 +473,46 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
return nil
}
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
// readSegmentSeries reads the series records into w.writer from a segment.
// It returns the EOF error if the segment is corrupted or partially written.
func (w *Watcher) readSegmentSeries(r *LiveReader, segmentNum int) error {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreSeries(series, segmentNum)
case record.Unknown:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
default:
// We're not interested in other types of records.
}
}
if err := r.Err(); err != nil {
return fmt.Errorf("segment %d: %w", segmentNum, err)
}
return nil
}
// readSegment reads all known records into w.writer from a segment.
// It returns the EOF error if the segment is corrupted or partially written.
func (w *Watcher) readSegment(r *LiveReader, startT int64, segmentNum int) (err error) {
var (
// One table per WAL segment means it won't grow indefinitely.
dec = record.NewDecoder(labels.NewSymbolTable())
// TODO(bwplotka): Consider zeropools.
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
@ -490,8 +523,8 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
floatHistogramsToSend []record.RefFloatHistogramSample
metadata []record.RefMetadata
)
for r.Next() && !isClosed(w.quit) {
var err error
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
@ -505,22 +538,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if !tail {
break
}
samples, err = dec.Samples(rec, samples[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, s := range samples {
if s.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
w.logger.Info("Done replaying WAL", "duration", duration)
if s.T > startT {
if !w.replayDone {
w.replayDone = true
w.logger.Info("Done replaying WAL", "duration", time.Since(timestamp.Time(startT)))
}
samplesToSend = append(samplesToSend, s)
}
@ -535,11 +562,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if !tail {
break
}
exemplars, err = dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
@ -552,20 +574,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendHistograms {
break
}
if !tail {
break
}
histograms, err = dec.HistogramSamples(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, h := range histograms {
if h.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
w.logger.Info("Done replaying WAL", "duration", duration)
if h.T > startT {
if !w.replayDone {
w.replayDone = true
w.logger.Info("Done replaying WAL", "duration", time.Since(timestamp.Time(startT)))
}
histogramsToSend = append(histogramsToSend, h)
}
@ -580,20 +598,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendHistograms {
break
}
if !tail {
break
}
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, fh := range floatHistograms {
if fh.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
w.logger.Info("Done replaying WAL", "duration", duration)
if fh.T > startT {
if !w.replayDone {
w.replayDone = true
w.logger.Info("Done replaying WAL", "duration", time.Since(timestamp.Time(startT)))
}
floatHistogramsToSend = append(floatHistogramsToSend, fh)
}
@ -607,12 +621,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendMetadata {
break
}
metadata, err = dec.Metadata(rec, metadata[:0])
meta, err := dec.Metadata(rec, metadata[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreMetadata(metadata)
w.writer.StoreMetadata(meta)
case record.Unknown:
// Could be corruption, or reading from a WAL from a newer Prometheus.
@ -628,9 +642,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return nil
}
// Go through all series in a segment updating the segmentNum, so we can delete older series.
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
// readSegmentForGC goes through all series in a segment updating the segmentNum, so we can delete older series.
// It returns the EOF error if the segment is corrupted or partially written.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int) error {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function.
series []record.RefSeries
@ -662,12 +676,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
return nil
}
func (w *Watcher) SetStartTime(t time.Time) {
w.startTime = t
w.startTimestamp = timestamp.FromTime(t)
}
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int) error
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
@ -683,29 +692,20 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
return fmt.Errorf("unable to get segments checkpoint dir: %w", err)
}
for _, segRef := range segs {
size, err := getSegmentSize(checkpointDir, segRef.index)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
sr, err := OpenReadSegment(SegmentName(checkpointDir, segRef.index))
if err != nil {
return fmt.Errorf("unable to open segment: %w", err)
}
r := NewLiveReader(w.logger, w.readerMetrics, sr)
err = readFn(w, r, index, false)
err = handleFullSegmentPartialReads(readFn(w, r, index), r, getSegmentSizeFn(checkpointDir, segRef.index))
sr.Close()
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("readSegment: %w", err)
}
if r.Offset() != size {
return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, segRef.index, size, r.Offset())
if err != nil {
return fmt.Errorf("readCheckpoint: %w", err)
}
}
w.logger.Debug("Read series references from checkpoint", "checkpoint", checkpointDir)
w.logger.Debug("Done reading series references from checkpoint", "checkpoint", checkpointDir)
return nil
}
@ -725,16 +725,6 @@ func checkpointNum(dir string) (int, error) {
return result, nil
}
// Get size of segment.
func getSegmentSize(dir string, index int) (int64, error) {
i := int64(-1)
fi, err := os.Stat(SegmentName(dir, index))
if err == nil {
i = fi.Size()
}
return i, err
}
func isClosed(c chan struct{}) bool {
select {
case <-c:
@ -743,3 +733,35 @@ func isClosed(c chan struct{}) bool {
return false
}
}
// handleFullSegmentPartialReads handles LiveReader derived errors in case of knowingly
// full segment read. This is needed because LiveReader always returns EOF, even
// for full, successful segment reads.
func handleFullSegmentPartialReads(err error, r *LiveReader, getSize func() (int64, error)) error {
if err == nil {
// LiveReader never returns non-nil errors, but handle this, might happen in the future.
return nil
}
if !errors.Is(err, io.EOF) {
return err
}
size, err := getSize()
if err != nil {
return err
}
if r.Offset() == size {
return nil
}
return fmt.Errorf("expected to read the segment fully, but got EOF, may have dropped data; read %v/%v", r.Offset(), size)
}
func getSegmentSizeFn(dir string, segmentNum int) func() (int64, error) {
return func() (int64, error) {
fi, err := os.Stat(SegmentName(dir, segmentNum))
if err != nil {
return 0, fmt.Errorf("get segment size: %w", err)
}
return fi.Size(), nil
}
}

File diff suppressed because it is too large Load diff

69
util/testrecord/record.go Normal file
View file

@ -0,0 +1,69 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testrecord
import (
"math"
"testing"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
type RefSamplesCase string
const (
Realistic1000Samples RefSamplesCase = "real1000"
WorstCase1000Samples RefSamplesCase = "worst1000"
)
func GenTestRefSamplesCase(t testing.TB, c RefSamplesCase) []record.RefSample {
t.Helper()
ret := make([]record.RefSample, 1e3)
switch c {
case Realistic1000Samples:
for i := range ret {
ret[i].Ref = chunks.HeadSeriesRef(i)
ret[i].T = 12423423
ret[i].V = highVarianceFloat(i)
}
case WorstCase1000Samples:
for i := range ret {
ret[i].Ref = chunks.HeadSeriesRef(i)
// Worst case is when the values are significantly different
// to each other which breaks delta encoding.
ret[i].T = highVarianceInt(i)
ret[i].V = highVarianceFloat(i)
}
default:
t.Fatal("unknown case", c)
}
return ret
}
func highVarianceInt(i int) int64 {
if i%2 == 0 {
return math.MinInt32
}
return math.MaxInt32
}
func highVarianceFloat(i int) float64 {
if i%2 == 0 {
return math.SmallestNonzeroFloat32
}
return math.MaxFloat32
}