diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 785700f091..fadd554393 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -17,12 +17,9 @@ import ( "context" "errors" "fmt" - "math" "math/rand" "os" - "path" "runtime/pprof" - "sort" "strconv" "strings" "sync" @@ -50,7 +47,6 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" - "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -1396,45 +1392,6 @@ func BenchmarkStoreSeries(b *testing.B) { } } -func BenchmarkStartup(b *testing.B) { - dir := os.Getenv("WALDIR") - if dir == "" { - b.Skip("WALDIR env var not set") - } - - // Find the second largest segment; we will replay up to this. - // (Second largest as WALWatcher will start tailing the largest). - dirents, err := os.ReadDir(path.Join(dir, "wal")) - require.NoError(b, err) - - var segments []int - for _, dirent := range dirents { - if i, err := strconv.Atoi(dirent.Name()); err == nil { - segments = append(segments, i) - } - } - sort.Ints(segments) - - logger := promslog.New(&promslog.Config{}) - - cfg := testDefaultQueueConfig() - mcfg := config.DefaultMetadataConfig - for n := 0; n < b.N; n++ { - metrics := newQueueManagerMetrics(nil, "", "") - watcherMetrics := wlog.NewWatcherMetrics(nil) - c := NewTestBlockedWriteClient() - // todo: test with new proto type(s) - m := NewQueueManager(metrics, watcherMetrics, nil, logger, dir, - newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) - m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) - m.watcher.MaxSegment = segments[len(segments)-2] - m.watcher.SetMetrics() - err := m.watcher.Run() - require.NoError(b, err) - } -} - func TestProcessExternalLabels(t *testing.T) { b := labels.NewBuilder(labels.EmptyLabels()) for i, tc := range []struct { diff --git a/tsdb/wlog/live_reader.go b/tsdb/wlog/live_reader.go index 04f24387bf..427553c890 100644 --- a/tsdb/wlog/live_reader.go +++ b/tsdb/wlog/live_reader.go @@ -45,7 +45,6 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { if reg != nil { reg.MustRegister(m.readerCorruptionErrors) } - return m } @@ -83,9 +82,6 @@ type LiveReader struct { total int64 // Total bytes processed during reading in calls to Next(). index int // Used to track partial records, should be 0 at the start of every new record. - // For testing, we can treat EOF as a non-error. - eofNonErr bool - // We sometime see records span page boundaries. Should never happen, but it // does. Until we track down why, set permissive to true to tolerate it. // NB the non-ive Reader implementation allows for this. @@ -94,18 +90,19 @@ type LiveReader struct { metrics *LiveReaderMetrics } -// Err returns any errors encountered reading the WAL. io.EOFs are not terminal -// and Next can be tried again. Non-EOFs are terminal, and the reader should -// not be used again. It is up to the user to decide when to stop trying should -// io.EOF be returned. +// Err returns any errors encountered reading the WAL. io.EOFs are not terminal +// and Next can be tried again. Note that LiveReader don't know when we read the +// segment fully, so it will never return nil error. +// +// See handleFullSegmentPartialReads on one way of handling full segments with +// live reader. +// +// Non-EOFs are terminal, and the reader should not be used again. func (r *LiveReader) Err() error { - if r.eofNonErr && errors.Is(r.err, io.EOF) { - return nil - } return r.err } -// Offset returns the number of bytes consumed from this segment. +// Offset returns the number of bytes consumed from the reader interface. func (r *LiveReader) Offset() int64 { return r.total } @@ -117,7 +114,7 @@ func (r *LiveReader) fillBuffer() (int, error) { } // Next returns true if Record() will contain a full record. -// If Next returns false, you should always checked the contents of Error(). +// If Next returns false, you should always check the contents of Error(). // Return false guarantees there are no more records if the segment is closed // and not corrupt, otherwise if Err() == io.EOF you should try again when more // data has been written. diff --git a/tsdb/wlog/reader_test.go b/tsdb/wlog/reader_test.go index 0cbe882844..33cd71a00d 100644 --- a/tsdb/wlog/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -18,6 +18,7 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "errors" "fmt" "hash/crc32" "io" @@ -54,7 +55,6 @@ var readerConstructors = map[string]func(io.Reader) reader{ }, "LiveReader": func(r io.Reader) reader { lr := NewLiveReader(promslog.NewNopLogger(), NewLiveReaderMetrics(nil), r) - lr.eofNonErr = true return lr }, } @@ -186,7 +186,16 @@ func TestReader(t *testing.T) { require.Equal(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") } if !c.fail { - require.NoError(t, r.Err()) + if lr, ok := r.(*LiveReader); ok && errors.Is(r.Err(), io.EOF) { + // Live reader does not know if the EOF is because of partial + // segment or full segment. Handle it like all users are supposed to + // handle it. + require.NoError(t, handleFullSegmentPartialReads(r.Err(), lr, func() (int64, error) { + return int64(len(buf)), nil + })) + } else { + require.NoError(t, r.Err()) + } } else { require.Error(t, r.Err()) } diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index f171a8bdc1..4fe67ac959 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "log/slog" - "math" "os" "path/filepath" "strconv" @@ -26,6 +25,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/promslog" "github.com/prometheus/prometheus/model/labels" @@ -34,36 +34,47 @@ import ( ) const ( - checkpointPeriod = 5 * time.Second + // TODO(bwplotka): Checking every 100ms feels too frequent. It might be enough + // to check on notify AND with emergency 15s read only. segmentCheckPeriod = 100 * time.Millisecond consumer = "consumer" ) -var ( - ErrIgnorable = errors.New("ignore me") - readTimeout = 15 * time.Second -) - // WriteTo is an interface used by the Watcher to send the samples it's read -// from the WAL on to somewhere else. Functions will be called concurrently -// and it is left to the implementer to make sure they are safe. +// from the WAL on to somewhere else. Methods must be concurrency safe. +// +// All record.Ref* slices are only valid until each method finished, implementers +// must not try to reuse the underlying arrays. type WriteTo interface { - // Append and AppendExemplar should block until the samples are fully accepted, - // whether enqueued in memory or successfully written to it's final destination. + // Append should block until the samples are fully accepted, + // whether enqueued in memory or successfully written to its final destination. // Once returned, the WAL Watcher will not attempt to pass that data again. Append([]record.RefSample) bool + // AppendExemplars should block until the samples are fully accepted, + // whether enqueued in memory or successfully written to its final destination. + // Once returned, the WAL Watcher will not attempt to pass that data again. AppendExemplars([]record.RefExemplar) bool + // AppendHistograms should block until the samples are fully accepted, + // whether enqueued in memory or successfully written to its final destination. + // Once returned, the WAL Watcher will not attempt to pass that data again. AppendHistograms([]record.RefHistogramSample) bool + // AppendFloatHistograms should block until the samples are fully accepted, + // whether enqueued in memory or successfully written to its final destination. + // Once returned, the WAL Watcher will not attempt to pass that data again. AppendFloatHistograms([]record.RefFloatHistogramSample) bool + StoreSeries([]record.RefSeries, int) StoreMetadata([]record.RefMetadata) - // UpdateSeriesSegment and SeriesReset are intended for - // garbage-collection: - // First we call UpdateSeriesSegment on all current series. + // UpdateSeriesSegment is intended for GC. + // First we call UpdateSeriesSegment on all the current series, then SeriesReset + // is called to allow the deletion of all series created in a segment lower + // than the argument. UpdateSeriesSegment([]record.RefSeries, int) - // Then SeriesReset is called to allow the deletion of all series - // created in a segment lower than the argument. + // SeriesReset is intended for GC. + // First we call UpdateSeriesSegment on all the current series, then SeriesReset + // is called to allow the deletion of all series created in a segment lower + // than the argument. SeriesReset(int) } @@ -72,6 +83,7 @@ type WriteNotified interface { Notify() } +// WatcherMetrics allows sharing metrics across multiple watcher instances. type WatcherMetrics struct { recordsRead *prometheus.CounterVec recordDecodeFails *prometheus.CounterVec @@ -80,23 +92,23 @@ type WatcherMetrics struct { notificationsSkipped *prometheus.CounterVec } -// Watcher watches the TSDB WAL for a given WriteTo. +// Watcher watches the TSDB WAL and writes the data to a given WriteTo. +// See Start and Watch for details. type Watcher struct { - name string - writer WriteTo - logger *slog.Logger - walDir string - lastCheckpoint string + name string + writer WriteTo + logger *slog.Logger + walDir string + sendExemplars bool sendHistograms bool sendMetadata bool - metrics *WatcherMetrics - readerMetrics *LiveReaderMetrics + replayDone bool - startTime time.Time - startTimestamp int64 // the start time as a Prometheus timestamp - sendSamples bool + lastCheckpoint string + metrics *WatcherMetrics + readerMetrics *LiveReaderMetrics recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter samplesSentPreTailing prometheus.Counter @@ -107,13 +119,13 @@ type Watcher struct { quit chan struct{} done chan struct{} - // For testing, stop when we hit this segment. - MaxSegment int + checkpointPeriod time.Duration + readTimeout time.Duration } func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { m := &WatcherMetrics{ - recordsRead: prometheus.NewCounterVec( + recordsRead: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", @@ -122,7 +134,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { }, []string{consumer, "type"}, ), - recordDecodeFails: prometheus.NewCounterVec( + recordDecodeFails: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", @@ -131,7 +143,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { }, []string{consumer}, ), - samplesSentPreTailing: prometheus.NewCounterVec( + samplesSentPreTailing: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", @@ -140,7 +152,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { }, []string{consumer}, ), - currentSegment: prometheus.NewGaugeVec( + currentSegment: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", @@ -149,7 +161,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { }, []string{consumer}, ), - notificationsSkipped: prometheus.NewCounterVec( + notificationsSkipped: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", @@ -159,23 +171,28 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { []string{consumer}, ), } - - if reg != nil { - reg.MustRegister(m.recordsRead) - reg.MustRegister(m.recordDecodeFails) - reg.MustRegister(m.samplesSentPreTailing) - reg.MustRegister(m.currentSegment) - reg.MustRegister(m.notificationsSkipped) - } - return m } -// NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger *slog.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher { +// NewWatcher creates a new WAL watcher that watches the for a given WriteTo. +func NewWatcher( + metrics *WatcherMetrics, + readerMetrics *LiveReaderMetrics, + logger *slog.Logger, + name string, + writer WriteTo, + dir string, + sendExemplars, sendHistograms, sendMetadata bool, +) *Watcher { if logger == nil { logger = promslog.NewNopLogger() } + if metrics == nil { + metrics = NewWatcherMetrics(nil) + } + if readerMetrics == nil { + readerMetrics = NewLiveReaderMetrics(nil) + } return &Watcher{ logger: logger, writer: writer, @@ -187,11 +204,11 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge sendHistograms: sendHistograms, sendMetadata: sendMetadata, - readNotify: make(chan struct{}), - quit: make(chan struct{}), - done: make(chan struct{}), - - MaxSegment: -1, + readNotify: make(chan struct{}), + quit: make(chan struct{}), + done: make(chan struct{}), + checkpointPeriod: 5 * time.Second, + readTimeout: 15 * time.Second, } } @@ -206,84 +223,85 @@ func (w *Watcher) Notify() { } } -func (w *Watcher) SetMetrics() { - // Setup the WAL Watchers metrics. We do this here rather than in the - // constructor because of the ordering of creating Queue Managers's, - // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. - if w.metrics != nil { - w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name}) - w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) - w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) - w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) - w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name) - } +// startMetrics initialize the mandatory shared watcher metrics. +// We do this here rather than in the constructor because of the ordering of +// creating Queue Managers's, stopping them, and then starting new ones in +// storage/remote/storage.go ApplyConfig. +func (w *Watcher) initMetrics() { + w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name}) + w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) + w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) + w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) + w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name) } -// Start the Watcher. -func (w *Watcher) Start() { - w.SetMetrics() - w.logger.Info("Starting WAL watcher", "queue", w.name) - - go w.loop() -} - -// Stop the Watcher. +// Stop the Watcher and waits until it fully stops. func (w *Watcher) Stop() { close(w.quit) <-w.done - // Records read metric has series and samples. - if w.metrics != nil { - w.metrics.recordsRead.DeleteLabelValues(w.name, "series") - w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") - w.metrics.recordDecodeFails.DeleteLabelValues(w.name) - w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) - w.metrics.currentSegment.DeleteLabelValues(w.name) + for _, t := range []record.Type{record.Series, record.Samples, record.Tombstones, record.Exemplars, record.MmapMarkers, record.Metadata, record.HistogramSamples, record.FloatHistogramSamples, record.CustomBucketsHistogramSamples, record.CustomBucketsFloatHistogramSamples} { + w.metrics.recordsRead.DeleteLabelValues(w.name, t.String()) } + w.metrics.recordDecodeFails.DeleteLabelValues(w.name) + w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) + w.metrics.currentSegment.DeleteLabelValues(w.name) w.logger.Info("WAL watcher stopped", "queue", w.name) } -func (w *Watcher) loop() { - defer close(w.done) +// Start starts the routine that tails the WAL with time.Now start time, until +// the quit channel is closed. If the tailing returns error it retries with the +// error log. Non-series data. Read Watch for tailing logic details. +func (w *Watcher) Start() { + w.initMetrics() + w.logger.Info("Starting WAL watcher", "queue", w.name) - // We may encounter failures processing the WAL; we should wait and retry. - for !isClosed(w.quit) { - w.SetStartTime(time.Now()) - if err := w.Run(); err != nil { - w.logger.Error("error tailing WAL", "err", err) - } + go func() { + defer close(w.done) - select { - case <-w.quit: - return - case <-time.After(5 * time.Second): + // We may encounter failures processing the WAL; we should wait and retry. + for !isClosed(w.quit) { + if err := w.Watch(timestamp.FromTime(time.Now())); err != nil { + w.logger.Error("error tailing WAL", "err", err) + } + + select { + case <-w.quit: + return + case <-time.After(5 * time.Second): + } } - } + }() } -// Run the watcher, which will tail the WAL until the quit channel is closed -// or an error case is hit. -func (w *Watcher) Run() error { - _, lastSegment, err := Segments(w.walDir) - if err != nil { - return fmt.Errorf("Segments: %w", err) +// Watch tails the WAL until the quit channel is closed or an error. +// +// Tailing logic writes the known types of WAL records to WriteTo interface. +// - Series are gathered from all the available WAL segments. +// - Other type of data is gathered only from the last segment and waits for the new data to come in. +// - For samples and histograms startT controls after what timestamp samples should be written to WriteTo. +// This allows retrying watching without reading overlapping data. +func (w *Watcher) Watch(startT int64) error { + if metricsNotInitialized := w.recordsReadMetric == nil; metricsNotInitialized { + w.initMetrics() } - // We want to ensure this is false across iterations since - // Run will be called again if there was a failure to read the WAL. - w.sendSamples = false - + _, lastSegment, err := Segments(w.walDir) + if err != nil { + return fmt.Errorf("segments: %w", err) + } + w.replayDone = false w.logger.Info("Replaying WAL", "queue", w.name) - // Backfill from the checkpoint first if it exists. + // Backfill series from the checkpoint first if it exists. lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) if err != nil && !errors.Is(err, record.ErrNotFound) { return fmt.Errorf("tsdb.LastCheckpoint: %w", err) } if err == nil { - if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil { + if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegmentSeries); err != nil { return fmt.Errorf("readCheckpoint: %w", err) } } @@ -294,25 +312,15 @@ func (w *Watcher) Run() error { return err } - w.logger.Debug("Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) + w.logger.Debug("Tailing WAL", "lastCheckpoint", strings.TrimPrefix(lastCheckpoint, w.walDir), "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) - - // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. - // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. w.logger.Debug("Processing segment", "currentSegment", currentSegment) - if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) { + if err := w.watchSegment(startT, currentSegment, currentSegment >= lastSegment); err != nil { return err } - - // For testing: stop when you hit a specific segment. - if currentSegment == w.MaxSegment { - return nil - } - currentSegment++ } - return nil } @@ -328,34 +336,15 @@ func (w *Watcher) findSegmentForIndex(index int) (int, error) { return r.index, nil } } - return -1, errors.New("failed to find segment for index") } -func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error { - err := w.readSegment(r, segmentNum, tail) - - // Ignore all errors reading to end of segment whilst replaying the WAL. - if !tail { - if err != nil && !errors.Is(err, io.EOF) { - w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) - } else if r.Offset() != size { - w.logger.Warn("Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size) - } - return ErrIgnorable - } - - // Otherwise, when we are tailing, non-EOFs are fatal. - if err != nil && !errors.Is(err, io.EOF) { - return err - } - return nil -} - -// Use tail true to indicate that the reader is currently on a segment that is -// actively being written to. If false, assume it's a full segment and we're -// replaying it on start to cache the series records. -func (w *Watcher) watch(segmentNum int, tail bool) error { +// watchSegment tails a single WAL segment. +// Tail parameter indicates that the reader is currently on a segment that is +// actively being written to and watcher should tail it until the quit channel +// is closed. If false, assume it's a full segment, and we're replaying it only +// to only cache the series records. +func (w *Watcher) watchSegment(startT int64, segmentNum int, tail bool) error { segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum)) if err != nil { return err @@ -364,24 +353,28 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { reader := NewLiveReader(w.logger, w.readerMetrics, segment) - size := int64(math.MaxInt64) if !tail { - var err error - size, err = getSegmentSize(w.walDir, segmentNum) - if err != nil { - return fmt.Errorf("getSegmentSize: %w", err) + if err := handleFullSegmentPartialReads(w.readSegmentSeries(reader, segmentNum), reader, getSegmentSizeFn(w.walDir, segmentNum)); err != nil { + // Ignore all errors reading to end of segment whilst replaying the WAL. + w.logger.Warn("Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) } - - return w.readAndHandleError(reader, segmentNum, tail, size) + return nil } - checkpointTicker := time.NewTicker(checkpointPeriod) + // Always try to read from the new segment before we wait for emergency read timeout, + // new segments or notifications. EOFs are not fatal as there might be other + // routine writing to a segment. + if err := w.readSegment(reader, startT, segmentNum); err != nil && !errors.Is(err, io.EOF) { + return err + } + + checkpointTicker := time.NewTicker(w.checkpointPeriod) defer checkpointTicker.Stop() segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() - readTicker := time.NewTicker(readTimeout) + readTicker := time.NewTicker(w.readTimeout) defer readTicker.Stop() gcSem := make(chan struct{}, 1) @@ -410,35 +403,40 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { // Currently doing a garbage collect, try again later. } - // if a newer segment is produced, read the current one until the end and move on. + // If a newer segment is produced, read the current one until the end and return + // so we can watch the next segment. case <-segmentTicker.C: _, last, err := Segments(w.walDir) if err != nil { - return fmt.Errorf("Segments: %w", err) + return fmt.Errorf("segments: %w", err) } - if last > segmentNum { - return w.readAndHandleError(reader, segmentNum, tail, size) + // At this point we expect full segment, so handle LiveReader EOF that + // are false (read more in handleFullSegmentPartialReads). + if err := handleFullSegmentPartialReads(w.readSegment(reader, startT, segmentNum), reader, getSegmentSizeFn(w.walDir, segmentNum)); err != nil { + return fmt.Errorf("read on a new segment: %w", err) + } + return nil } + // No new segments, continue normal flow. continue - // we haven't read due to a notification in quite some time, try reading anyways + // We haven't read due to a notification in quite some time, try reading anyway. case <-readTicker.C: - w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout) - err := w.readAndHandleError(reader, segmentNum, tail, size) - if err != nil { + w.logger.Debug("Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", w.readTimeout) + + // EOFs are not fatal as there might be other routine writing to a segment. + if err := w.readSegment(reader, startT, segmentNum); err != nil && !errors.Is(err, io.EOF) { return err } - // reset the ticker so we don't read too often - readTicker.Reset(readTimeout) + readTicker.Reset(w.readTimeout) case <-w.readNotify: - err := w.readAndHandleError(reader, segmentNum, tail, size) - if err != nil { + // EOFs are not fatal as there might be other routine writing to a segment. + if err := w.readSegment(reader, startT, segmentNum); err != nil && !errors.Is(err, io.EOF) { return err } - // reset the ticker so we don't read too often - readTicker.Reset(readTimeout) + readTicker.Reset(w.readTimeout) } } } @@ -475,11 +473,46 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { return nil } -// Read from a segment and pass the details to w.writer. -// Also used with readCheckpoint - implements segmentReadFn. -func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { +// readSegmentSeries reads the series records into w.writer from a segment. +// It returns the EOF error if the segment is corrupted or partially written. +func (w *Watcher) readSegmentSeries(r *LiveReader, segmentNum int) error { var ( - dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. + dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. + series []record.RefSeries + ) + for r.Next() && !isClosed(w.quit) { + rec := r.Record() + w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc() + + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.StoreSeries(series, segmentNum) + case record.Unknown: + // Could be corruption, or reading from a WAL from a newer Prometheus. + w.recordDecodeFailsMetric.Inc() + default: + // We're not interested in other types of records. + } + } + if err := r.Err(); err != nil { + return fmt.Errorf("segment %d: %w", segmentNum, err) + } + return nil +} + +// readSegment reads all known records into w.writer from a segment. +// It returns the EOF error if the segment is corrupted or partially written. +func (w *Watcher) readSegment(r *LiveReader, startT int64, segmentNum int) (err error) { + var ( + // One table per WAL segment means it won't grow indefinitely. + dec = record.NewDecoder(labels.NewSymbolTable()) + + // TODO(bwplotka): Consider zeropools. series []record.RefSeries samples []record.RefSample samplesToSend []record.RefSample @@ -490,8 +523,8 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { floatHistogramsToSend []record.RefFloatHistogramSample metadata []record.RefMetadata ) + for r.Next() && !isClosed(w.quit) { - var err error rec := r.Record() w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc() @@ -505,22 +538,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.writer.StoreSeries(series, segmentNum) case record.Samples: - // If we're not tailing a segment we can ignore any samples records we see. - // This speeds up replay of the WAL by > 10x. - if !tail { - break - } samples, err = dec.Samples(rec, samples[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } for _, s := range samples { - if s.T > w.startTimestamp { - if !w.sendSamples { - w.sendSamples = true - duration := time.Since(w.startTime) - w.logger.Info("Done replaying WAL", "duration", duration) + if s.T > startT { + if !w.replayDone { + w.replayDone = true + w.logger.Info("Done replaying WAL", "duration", time.Since(timestamp.Time(startT))) } samplesToSend = append(samplesToSend, s) } @@ -535,11 +562,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendExemplars { break } - // If we're not tailing a segment we can ignore any exemplars records we see. - // This speeds up replay of the WAL significantly. - if !tail { - break - } exemplars, err = dec.Exemplars(rec, exemplars[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() @@ -552,20 +574,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendHistograms { break } - if !tail { - break - } histograms, err = dec.HistogramSamples(rec, histograms[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } for _, h := range histograms { - if h.T > w.startTimestamp { - if !w.sendSamples { - w.sendSamples = true - duration := time.Since(w.startTime) - w.logger.Info("Done replaying WAL", "duration", duration) + if h.T > startT { + if !w.replayDone { + w.replayDone = true + w.logger.Info("Done replaying WAL", "duration", time.Since(timestamp.Time(startT))) } histogramsToSend = append(histogramsToSend, h) } @@ -580,20 +598,16 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendHistograms { break } - if !tail { - break - } floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } for _, fh := range floatHistograms { - if fh.T > w.startTimestamp { - if !w.sendSamples { - w.sendSamples = true - duration := time.Since(w.startTime) - w.logger.Info("Done replaying WAL", "duration", duration) + if fh.T > startT { + if !w.replayDone { + w.replayDone = true + w.logger.Info("Done replaying WAL", "duration", time.Since(timestamp.Time(startT))) } floatHistogramsToSend = append(floatHistogramsToSend, fh) } @@ -607,12 +621,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !w.sendMetadata { break } - metadata, err = dec.Metadata(rec, metadata[:0]) + meta, err := dec.Metadata(rec, metadata[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } - w.writer.StoreMetadata(metadata) + w.writer.StoreMetadata(meta) case record.Unknown: // Could be corruption, or reading from a WAL from a newer Prometheus. @@ -628,9 +642,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return nil } -// Go through all series in a segment updating the segmentNum, so we can delete older series. -// Used with readCheckpoint - implements segmentReadFn. -func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { +// readSegmentForGC goes through all series in a segment updating the segmentNum, so we can delete older series. +// It returns the EOF error if the segment is corrupted or partially written. +func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int) error { var ( dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function. series []record.RefSeries @@ -662,12 +676,7 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error return nil } -func (w *Watcher) SetStartTime(t time.Time) { - w.startTime = t - w.startTimestamp = timestamp.FromTime(t) -} - -type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error +type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int) error // Read all the series records from a Checkpoint directory. func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error { @@ -683,29 +692,20 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err return fmt.Errorf("unable to get segments checkpoint dir: %w", err) } for _, segRef := range segs { - size, err := getSegmentSize(checkpointDir, segRef.index) - if err != nil { - return fmt.Errorf("getSegmentSize: %w", err) - } - sr, err := OpenReadSegment(SegmentName(checkpointDir, segRef.index)) if err != nil { return fmt.Errorf("unable to open segment: %w", err) } r := NewLiveReader(w.logger, w.readerMetrics, sr) - err = readFn(w, r, index, false) + err = handleFullSegmentPartialReads(readFn(w, r, index), r, getSegmentSizeFn(checkpointDir, segRef.index)) sr.Close() - if err != nil && !errors.Is(err, io.EOF) { - return fmt.Errorf("readSegment: %w", err) - } - - if r.Offset() != size { - return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, segRef.index, size, r.Offset()) + if err != nil { + return fmt.Errorf("readCheckpoint: %w", err) } } - w.logger.Debug("Read series references from checkpoint", "checkpoint", checkpointDir) + w.logger.Debug("Done reading series references from checkpoint", "checkpoint", checkpointDir) return nil } @@ -725,16 +725,6 @@ func checkpointNum(dir string) (int, error) { return result, nil } -// Get size of segment. -func getSegmentSize(dir string, index int) (int64, error) { - i := int64(-1) - fi, err := os.Stat(SegmentName(dir, index)) - if err == nil { - i = fi.Size() - } - return i, err -} - func isClosed(c chan struct{}) bool { select { case <-c: @@ -743,3 +733,35 @@ func isClosed(c chan struct{}) bool { return false } } + +// handleFullSegmentPartialReads handles LiveReader derived errors in case of knowingly +// full segment read. This is needed because LiveReader always returns EOF, even +// for full, successful segment reads. +func handleFullSegmentPartialReads(err error, r *LiveReader, getSize func() (int64, error)) error { + if err == nil { + // LiveReader never returns non-nil errors, but handle this, might happen in the future. + return nil + } + if !errors.Is(err, io.EOF) { + return err + } + + size, err := getSize() + if err != nil { + return err + } + if r.Offset() == size { + return nil + } + return fmt.Errorf("expected to read the segment fully, but got EOF, may have dropped data; read %v/%v", r.Offset(), size) +} + +func getSegmentSizeFn(dir string, segmentNum int) func() (int64, error) { + return func() (int64, error) { + fi, err := os.Stat(SegmentName(dir, segmentNum)) + if err != nil { + return 0, fmt.Errorf("get segment size: %w", err) + } + return fi.Size(), nil + } +} diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 6aebe7dab3..a9a6f46208 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -14,34 +14,34 @@ package wlog import ( "fmt" + "log/slog" + "math" "math/rand" "os" "path" - "runtime" "sync" "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/prometheus/util/testrecord" ) var ( defaultRetryInterval = 100 * time.Millisecond defaultRetries = 100 - wMetrics = NewWatcherMetrics(prometheus.DefaultRegisterer) ) // retry executes f() n times at each interval until it returns true. -func retry(t *testing.T, interval time.Duration, n int, f func() bool) { +func retry(t testing.TB, interval time.Duration, n int, f func() bool) { t.Helper() ticker := time.NewTicker(interval) for i := 0; i <= n; i++ { @@ -54,19 +54,13 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) { t.Logf("function returned false") } -// Overwrite readTimeout defined in watcher.go. -func overwriteReadTimeout(t *testing.T, val time.Duration) { - initialVal := readTimeout - readTimeout = val - t.Cleanup(func() { readTimeout = initialVal }) -} - type writeToMock struct { + mu sync.Mutex + samplesAppended int exemplarsAppended int histogramsAppended int floatHistogramsAppended int - seriesLock sync.Mutex seriesSegmentIndexes map[chunks.HeadSeriesRef]int // If nonzero, delay reads with a short sleep. @@ -75,24 +69,36 @@ type writeToMock struct { func (wtm *writeToMock) Append(s []record.RefSample) bool { time.Sleep(wtm.delay) + + wtm.mu.Lock() + defer wtm.mu.Unlock() wtm.samplesAppended += len(s) return true } func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { time.Sleep(wtm.delay) + + wtm.mu.Lock() + defer wtm.mu.Unlock() wtm.exemplarsAppended += len(e) return true } func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { time.Sleep(wtm.delay) + + wtm.mu.Lock() + defer wtm.mu.Unlock() wtm.histogramsAppended += len(h) return true } func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { time.Sleep(wtm.delay) + + wtm.mu.Lock() + defer wtm.mu.Unlock() wtm.floatHistogramsAppended += len(fh) return true } @@ -105,8 +111,8 @@ func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { func (wtm *writeToMock) StoreMetadata(_ []record.RefMetadata) { /* no-op */ } func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) { - wtm.seriesLock.Lock() - defer wtm.seriesLock.Unlock() + wtm.mu.Lock() + defer wtm.mu.Unlock() for _, s := range series { wtm.seriesSegmentIndexes[s.Ref] = index } @@ -115,8 +121,8 @@ func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int func (wtm *writeToMock) SeriesReset(index int) { // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. - wtm.seriesLock.Lock() - defer wtm.seriesLock.Unlock() + wtm.mu.Lock() + defer wtm.mu.Unlock() for k, v := range wtm.seriesSegmentIndexes { if v < index { delete(wtm.seriesSegmentIndexes, k) @@ -124,12 +130,49 @@ func (wtm *writeToMock) SeriesReset(index int) { } } -func (wtm *writeToMock) checkNumSeries() int { - wtm.seriesLock.Lock() - defer wtm.seriesLock.Unlock() +func (wtm *writeToMock) seriesStored() int { + wtm.mu.Lock() + defer wtm.mu.Unlock() return len(wtm.seriesSegmentIndexes) } +func (wtm *writeToMock) expectEventually( + t testing.TB, isRunningFn func() bool, + series, samples, exemplars, histograms int, +) { + t.Helper() + + retry(t, defaultRetryInterval, defaultRetries, func() bool { + if !isRunningFn() { + t.Fatal("watcher prematurely finished") + } + + wtm.mu.Lock() + defer wtm.mu.Unlock() + if len(wtm.seriesSegmentIndexes) != series { + return false + } + if wtm.samplesAppended != samples { + return false + } + if wtm.exemplarsAppended != exemplars { + return false + } + if wtm.histogramsAppended+wtm.floatHistogramsAppended != 2*histograms { + return false + } + return true + }) + + wtm.mu.Lock() + defer wtm.mu.Unlock() + require.Len(t, wtm.seriesSegmentIndexes, series, "did not receive the expected number of series") + require.Equal(t, samples, wtm.samplesAppended, "did not receive the expected number of samples") + require.Equal(t, exemplars, wtm.exemplarsAppended, "did not receive the expected number of exemplars") + require.Equal(t, histograms, wtm.histogramsAppended, "did not receive the expected number of histograms") + require.Equal(t, histograms, wtm.floatHistogramsAppended, "did not receive the expected number of float histograms") +} + func newWriteToMock(delay time.Duration) *writeToMock { return &writeToMock{ seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), @@ -137,699 +180,511 @@ func newWriteToMock(delay time.Duration) *writeToMock { } } -func TestTailSamples(t *testing.T) { - pageSize := 32 * 1024 - const seriesCount = 10 - const samplesCount = 250 - const exemplarsCount = 25 - const histogramsCount = 50 - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - now := time.Now() +func newTestWatcher(dir string, to WriteTo) *Watcher { + return NewWatcher(nil, nil, nil, "test", to, dir, true, true, true) +} - dir := t.TempDir() +func startWatching(t *testing.T, w *Watcher, startTimeFn func() time.Time) (isRunning func() bool) { + t.Helper() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) + // It's like watcher.Start() but allows setting startTime and report errors to testutil. + go func() { + defer close(w.done) + require.NoError(t, w.Watch(timestamp.FromTime(startTimeFn()))) + }() + t.Cleanup(w.Stop) - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - - for j := 0; j < exemplarsCount; j++ { - inner := rand.Intn(ref + 1) - exemplar := enc.Exemplars([]record.RefExemplar{ - { - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - V: float64(i), - Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)), - }, - }, nil) - require.NoError(t, w.Log(exemplar)) - } - - for j := 0; j < histogramsCount; j++ { - inner := rand.Intn(ref + 1) - hist := &histogram.Histogram{ - Schema: 2, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - PositiveBuckets: []int64{int64(i) + 1}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, - NegativeBuckets: []int64{int64(-i) - 1}, - } - - histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - H: hist, - }}, nil) - require.NoError(t, w.Log(histograms)) - - customBucketHist := &histogram.Histogram{ - Schema: -53, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - CustomValues: []float64{float64(i) + 2}, - } - - customBucketHistograms := enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - H: customBucketHist, - }}, nil) - require.NoError(t, w.Log(customBucketHistograms)) - - floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - FH: hist.ToFloat(nil), - }}, nil) - require.NoError(t, w.Log(floatHistograms)) - - customBucketFloatHistograms := enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - FH: customBucketHist.ToFloat(nil), - }}, nil) - require.NoError(t, w.Log(customBucketFloatHistograms)) - } - } - - // Start read after checkpoint, no more data written. - first, last, err := Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true) - watcher.SetStartTime(now) - - // Set the Watcher's metrics so they're not nil pointers. - watcher.SetMetrics() - for i := first; i <= last; i++ { - segment, err := OpenReadSegment(SegmentName(watcher.walDir, i)) - require.NoError(t, err) - - reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) - // Use tail true so we can ensure we got the right number of samples. - watcher.readSegment(reader, i, true) - require.NoError(t, segment.Close()) - } - - expectedSeries := seriesCount - expectedSamples := seriesCount * samplesCount - expectedExemplars := seriesCount * exemplarsCount - expectedHistograms := seriesCount * histogramsCount * 2 - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries - }) - require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series") - require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") - require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") - require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") - require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms") - }) + return func() bool { + select { + case <-w.done: + return false + default: + return true + } } } -func TestReadToEndNoCheckpoint(t *testing.T) { - pageSize := 32 * 1024 - const seriesCount = 10 - const samplesCount = 250 +func logTestWALRecords(t *testing.T, ts time.Time, w *WL, seriesRefOffset, seriesCount, samplesCount, histogramsCount, exemplarsCount int) { + t.Helper() - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - var recs [][]byte - - enc := record.Encoder{} - - for i := 0; i < seriesCount; i++ { - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(i), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - recs = append(recs, series) - for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(j), - T: int64(i), - V: float64(i), - }, - }, nil) - - recs = append(recs, sample) - - // Randomly batch up records. - if rand.Intn(4) < 3 { - require.NoError(t, w.Log(recs...)) - recs = recs[:0] - } - } - } - require.NoError(t, w.Log(recs...)) - overwriteReadTimeout(t, time.Second) - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - go watcher.Start() - - expected := seriesCount - require.Eventually(t, func() bool { - return wt.checkNumSeries() == expected - }, 20*time.Second, 1*time.Second) - watcher.Stop() - }) - } -} - -func TestReadToEndWithCheckpoint(t *testing.T) { - segmentSize := 32 * 1024 - // We need something similar to this # of series and samples - // in order to get enough segments for us to checkpoint. - const seriesCount = 10 - const samplesCount = 250 - - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, segmentSize, compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - // Add in an unknown record type, which should be ignored. - require.NoError(t, w.Log([]byte{255})) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) - w.Truncate(1) - - // Write more records after checkpointing. - for i := 0; i < seriesCount; i++ { - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(i), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(j), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - overwriteReadTimeout(t, time.Second) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - go watcher.Start() - - expected := seriesCount * 2 - - require.Eventually(t, func() bool { - return wt.checkNumSeries() == expected - }, 10*time.Second, 1*time.Second) - watcher.Stop() - }) - } -} - -func TestReadCheckpoint(t *testing.T) { - pageSize := 32 * 1024 - const seriesCount = 10 - const samplesCount = 250 - - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - f, err := os.Create(SegmentName(wdir, 30)) - require.NoError(t, err) - require.NoError(t, f.Close()) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, w.Close()) - }) - - // Write to the initial segment then checkpoint. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - _, err = w.NextSegmentSync() - require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) - require.NoError(t, err) - require.NoError(t, w.Truncate(32)) - - // Start read after checkpoint, no more data written. - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - go watcher.Start() - - expectedSeries := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries - }) - watcher.Stop() - require.Equal(t, expectedSeries, wt.checkNumSeries()) - }) - } -} - -func TestReadCheckpointMultipleSegments(t *testing.T) { - pageSize := 32 * 1024 - - const segments = 1 - const seriesCount = 20 - const samplesCount = 300 - - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, pageSize, compress) - require.NoError(t, err) - - // Write a bunch of data. - for i := 0; i < segments; i++ { - for j := 0; j < seriesCount; j++ { - ref := j + (i * 100) - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for k := 0; k < samplesCount; k++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - } - require.NoError(t, w.Close()) - - // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5. - checkpointDir := dir + "/wal/checkpoint.000004" - err = os.Mkdir(checkpointDir, 0o777) - require.NoError(t, err) - for i := 0; i <= 4; i++ { - err := os.Rename(SegmentName(dir+"/wal", i), SegmentName(checkpointDir, i)) - require.NoError(t, err) - } - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.MaxSegment = -1 - - // Set the Watcher's metrics so they're not nil pointers. - watcher.SetMetrics() - - lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) - require.NoError(t, err) - - err = watcher.readCheckpoint(lastCheckpoint, (*Watcher).readSegment) - require.NoError(t, err) - }) - } -} - -func TestCheckpointSeriesReset(t *testing.T) { - segmentSize := 32 * 1024 - // We need something similar to this # of series and samples - // in order to get enough segments for us to checkpoint. - const seriesCount = 20 - const samplesCount = 350 - testCases := []struct { - compress compression.Type - segments int - }{ - {compress: compression.None, segments: 14}, - {compress: compression.Snappy, segments: 13}, - } - - for _, tc := range testCases { - t.Run(fmt.Sprintf("compress=%s", tc.compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, segmentSize, tc.compress) - require.NoError(t, err) - defer func() { - require.NoError(t, w.Close()) - }() - - // Write to the initial segment, then checkpoint later. - for i := 0; i < seriesCount; i++ { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for j := 0; j < samplesCount; j++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - - overwriteReadTimeout(t, time.Second) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.MaxSegment = -1 - go watcher.Start() - - expected := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expected - }) - require.Eventually(t, func() bool { - return wt.checkNumSeries() == seriesCount - }, 10*time.Second, 1*time.Second) - - _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) - require.NoError(t, err) - - err = w.Truncate(5) - require.NoError(t, err) - - _, cpi, err := LastCheckpoint(path.Join(dir, "wal")) - require.NoError(t, err) - err = watcher.garbageCollectSeries(cpi + 1) - require.NoError(t, err) - - watcher.Stop() - // If you modify the checkpoint and truncate segment #'s run the test to see how - // many series records you end up with and change the last Equals check accordingly - // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) - require.Eventually(t, func() bool { - return wt.checkNumSeries() == tc.segments - }, 20*time.Second, 1*time.Second) - }) - } -} - -func TestRun_StartupTime(t *testing.T) { - const pageSize = 32 * 1024 - const segments = 10 - const seriesCount = 20 - const samplesCount = 300 - - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) - - enc := record.Encoder{} - w, err := NewSize(nil, nil, wdir, pageSize, compress) - require.NoError(t, err) - - for i := 0; i < segments; i++ { - for j := 0; j < seriesCount; j++ { - ref := j + (i * 100) - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for k := 0; k < samplesCount; k++ { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: int64(i), - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - } - } - require.NoError(t, w.Close()) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.MaxSegment = segments - - watcher.SetMetrics() - startTime := time.Now() - - err = watcher.Run() - require.Less(t, time.Since(startTime), readTimeout) - require.NoError(t, err) - }) - } -} - -func generateWALRecords(w *WL, segment, seriesCount, samplesCount int) error { enc := record.Encoder{} - for j := 0; j < seriesCount; j++ { - ref := j + (segment * 100) + + for i := 0; i < seriesCount; i++ { + ref := (seriesRefOffset * seriesCount) + i + 100 series := enc.Series([]record.RefSeries{ { Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", segment)), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), }, }, nil) - if err := w.Log(series); err != nil { - return err - } + require.NoError(t, w.Log(series)) - for k := 0; k < samplesCount; k++ { + for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1) sample := enc.Samples([]record.RefSample{ { Ref: chunks.HeadSeriesRef(inner), - T: int64(segment), - V: float64(segment), + T: timestamp.FromTime(ts), + V: float64(i), }, }, nil) - if err := w.Log(sample); err != nil { - return err + require.NoError(t, w.Log(sample)) + } + + for j := 0; j < exemplarsCount; j++ { + inner := rand.Intn(ref + 1) + exemplar := enc.Exemplars([]record.RefExemplar{ + { + Ref: chunks.HeadSeriesRef(inner), + T: timestamp.FromTime(ts), + V: float64(i), + Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)), + }, + }, nil) + require.NoError(t, w.Log(exemplar)) + } + + for j := 0; j < histogramsCount; j++ { + inner := rand.Intn(ref + 1) + hist := &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{int64(i) + 1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{int64(-i) - 1}, } + + histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: timestamp.FromTime(ts), + H: hist, + }}, nil) + require.NoError(t, w.Log(histograms)) + + customBucketHist := &histogram.Histogram{ + Schema: -53, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + CustomValues: []float64{float64(i) + 2}, + } + + customBucketHistograms := enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: timestamp.FromTime(ts), + H: customBucketHist, + }}, nil) + require.NoError(t, w.Log(customBucketHistograms)) + + floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: timestamp.FromTime(ts), + FH: hist.ToFloat(nil), + }}, nil) + require.NoError(t, w.Log(floatHistograms)) + + customBucketFloatHistograms := enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{{ + Ref: chunks.HeadSeriesRef(inner), + T: timestamp.FromTime(ts), + FH: customBucketHist.ToFloat(nil), + }}, nil) + require.NoError(t, w.Log(customBucketFloatHistograms)) } } - return nil + // Add an unknown record type, which should be ignored. + require.NoError(t, w.Log([]byte{255})) } -func TestRun_AvoidNotifyWhenBehind(t *testing.T) { - if runtime.GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms. - t.SkipNow() - } - const segmentSize = pageSize // Smallest allowed segment size. - const segmentsToWrite = 5 - const segmentsToRead = segmentsToWrite - 1 - const seriesCount = 10 - const samplesCount = 50 +func expectSegments(t *testing.T, wdir string, expected int) { + t.Helper() + first, last, err := Segments(wdir) + require.NoError(t, err) + if first == -1 && last == -1 { + require.Equal(t, expected, 0, "expected different number of segments, got 0") + return + } + require.Equal(t, expected, (last-first)+1, "expected different number of segments, got %v to %v", first, last) +} + +func cutNewSegment(tb testing.TB, w *WL) { + tb.Helper() + + _, err := w.NextSegment() + require.NoError(tb, err) +} + +// TestWatch_Live starts a watcher on an empty WAL and expects it to follow all +// the incoming, live data written the multiple segments. +func TestWatch_Live(t *testing.T) { + const ( + seriesCount = 10 + samplesCount = 250 + exemplarsCount = 25 + histogramsCount = 50 + segments = 4 + ) for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { + now := time.Now() dir := t.TempDir() - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) + require.NoError(t, os.Mkdir(wdir, 0o777)) + + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() - w, err := NewSize(nil, nil, wdir, segmentSize, compress) - require.NoError(t, err) - // Write to 00000000, the watcher will read series from it. - require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount)) - // Create 00000001, the watcher will tail it once started. - w.NextSegment() + wt := newWriteToMock(0) + watcher := newTestWatcher(dir, wt) + // Start time has to be before now to read all samples correctly. + isRunningFn := startWatching(t, watcher, func() time.Time { return now.Add(-1 * time.Millisecond) }) - // Set up the watcher and run it in the background. - wt := newWriteToMock(time.Millisecond) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) - watcher.SetMetrics() - watcher.MaxSegment = segmentsToRead - - var g errgroup.Group - g.Go(func() error { - startTime := time.Now() - err = watcher.Run() - if err != nil { - return err + // Write a few segments. + ts := now + for i := range segments { + logTestWALRecords(t, ts, w, i, seriesCount, samplesCount, histogramsCount, exemplarsCount) + ts = ts.Add(1 * time.Minute) + if i < segments-1 { + cutNewSegment(t, w) } - // If the watcher was to wait for readTicker to read every new segment, it would need readTimeout * segmentsToRead. - d := time.Since(startTime) - if d > readTimeout { - return fmt.Errorf("watcher ran for %s, it shouldn't rely on readTicker=%s to read the new segments", d, readTimeout) - } - return nil - }) - - // The watcher went through 00000000 and is tailing the next one. - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() == seriesCount - }) - - // In the meantime, add some new segments in bulk. - // We should end up with segmentsToWrite + 1 segments now. - for i := 1; i < segmentsToWrite; i++ { - require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount)) - w.NextSegment() } + expectSegments(t, wdir, segments) - // Wait for the watcher. - require.NoError(t, g.Wait()) + // Watcher watched from the beginning so expect all data. + wt.expectEventually(t, isRunningFn, + seriesCount*segments, + // Last segment has 2x samples, exemplars and histograms, but due to start time logic + // we expect only half. + seriesCount*samplesCount*segments, + seriesCount*exemplarsCount*segments, + seriesCount*histogramsCount*2*segments, + ) - // All series and samples were read. - require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. - require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) - require.NoError(t, w.Close()) + // Whole test should not wait for emergency read timeout. + require.Less(t, time.Since(now), watcher.readTimeout) }) } } + +// TestWatch_ReplayStartTime starts a watcher on an already filled WAL. We expect it to +// replay series from all segments and replay data from the last segment, respecting +// start time. +// +// After the replay we also test some further live data read. +func TestWatch_ReplayStartTime(t *testing.T) { + const ( + seriesCount = 10 + samplesCount = 250 + exemplarsCount = 25 + histogramsCount = 50 + initialSegments = 4 + ) + for _, compress := range compression.Types() { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { + now := time.Now() + dir := t.TempDir() + wdir := path.Join(dir, "wal") + require.NoError(t, os.Mkdir(wdir, 0o777)) + + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() + + // Write a few segments. + ts := now + for i := range initialSegments { + logTestWALRecords(t, ts, w, i, seriesCount, samplesCount, histogramsCount, exemplarsCount) + ts = ts.Add(1 * time.Minute) + if i < initialSegments-1 { + cutNewSegment(t, w) + } + } + expectSegments(t, wdir, initialSegments) + + // For the last segment, log more entries, with the new timestamp to test start time logic. + logTestWALRecords(t, ts.Add(1*time.Minute), w, initialSegments+1, seriesCount, samplesCount, histogramsCount, exemplarsCount) + expectSegments(t, wdir, initialSegments) // Still the same last segment. + + // Create a watcher that should replay series and the last segment data. + // Set start time to ts, so we expect only half of the last segment to be replayed. + wt := newWriteToMock(0) + watcher := newTestWatcher(dir, wt) + isRunningFn := startWatching(t, watcher, func() time.Time { return ts }) + + wt.expectEventually(t, isRunningFn, + seriesCount*(initialSegments+1), + // Last segment has 2x samples, exemplars and histograms, but due to start time logic + // we expect only half. + seriesCount*samplesCount, + // TODO(bwplotka): Start time does not apply on exemplars, should it? + seriesCount*exemplarsCount*2, + seriesCount*histogramsCount*2, + ) + + // Cut a new segment and log new data. + cutNewSegment(t, w) + logTestWALRecords(t, ts.Add(2*time.Minute), w, initialSegments+2, seriesCount, samplesCount, histogramsCount, exemplarsCount) + watcher.Notify() + + wt.expectEventually(t, isRunningFn, + seriesCount*(initialSegments+2), + seriesCount*samplesCount*2, + // TODO(bwplotka): Start time does not apply on exemplars, should it? + seriesCount*exemplarsCount*3, + seriesCount*histogramsCount*2*2, + ) + + // Whole test should not wait for emergency read timeout. + require.Less(t, time.Since(now), watcher.readTimeout) + }) + } +} + +// TestWatch_ReplayWithCheckpoint starts a watcher on an already filled WAL with +// a checkpoint. We expect it to replay series from all checkpoints and segments +// and replay data from the last segment. +// +// After the replay we also test some further live checkpoint. +func TestWatch_ReplayWithCheckpoint(t *testing.T) { + const ( + seriesCount = 10 + samplesCount = 250 + exemplarsCount = 25 + histogramsCount = 50 + initialSegments = 5 + ) + for _, compress := range compression.Types() { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { + now := time.Now() + dir := t.TempDir() + wdir := path.Join(dir, "wal") + require.NoError(t, os.Mkdir(wdir, 0o777)) + + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() + + // Write a few segments. + ts := now + for i := range initialSegments { + logTestWALRecords(t, ts, w, i, seriesCount, samplesCount, histogramsCount, exemplarsCount) + ts = ts.Add(1 * time.Minute) + if i < initialSegments-1 { + cutNewSegment(t, w) + } + } + expectSegments(t, wdir, initialSegments) + + _, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(_ chunks.HeadSeriesRef, _ int) bool { return true }, 0) + require.NoError(t, err) + require.NoError(t, w.Truncate(2)) + expectSegments(t, wdir, initialSegments-2) // We should see 3 segment as the first 2 were truncated. + + // Create a watcher that should replay series and the last segment data. + wt := newWriteToMock(0) + watcher := newTestWatcher(dir, wt) + watcher.checkpointPeriod = 500 * time.Millisecond // Checkpoint period is long-ish (best effort), make our tests faster. + isRunningFn := startWatching(t, watcher, func() time.Time { return now }) + + wt.expectEventually(t, isRunningFn, + seriesCount*initialSegments, + seriesCount*samplesCount, + // TODO(bwplotka): Start time does not apply on exemplars, should it? + seriesCount*exemplarsCount, + seriesCount*histogramsCount*2, + ) + + // During watcher routine, do another checkpoint which deletes all previous series. + // Then we truncate. We expect watcher to reset the series eventually (GC routine). + _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 3, func(chunks.HeadSeriesRef, int) bool { return false }, 0) + require.NoError(t, err) + err = w.Truncate(4) + require.NoError(t, err) + expectSegments(t, wdir, 1) + + wt.expectEventually(t, isRunningFn, + seriesCount*2, + seriesCount*samplesCount, + // TODO(bwplotka): Start time does not apply on exemplars, should it? + seriesCount*exemplarsCount, + seriesCount*histogramsCount*2, + ) + + // Whole test should not wait for emergency read timeout. + require.Less(t, time.Since(now), watcher.readTimeout) + }) + } +} + +// TestWatch_EmergencyReadTimeout ensures we will read even if we miss notification. +func TestWatch_EmergencyReadTimeout(t *testing.T) { + const ( + seriesCount = 10 + samplesCount = 250 + exemplarsCount = 25 + histogramsCount = 50 + ) + for _, compress := range compression.Types() { + t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { + now := time.Now() + dir := t.TempDir() + wdir := path.Join(dir, "wal") + require.NoError(t, os.Mkdir(wdir, 0o777)) + + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + require.NoError(t, err) + defer func() { + require.NoError(t, w.Close()) + }() + + // Write to a segment. + ts := now + logTestWALRecords(t, ts, w, 0, seriesCount, samplesCount, histogramsCount, exemplarsCount) + ts = ts.Add(1 * time.Minute) + + // Start watcher. + wt := newWriteToMock(0) + watcher := newTestWatcher(dir, wt) + watcher.readTimeout = 200 * time.Millisecond // Make our test faster for the expected case. + + // Start time has to be before now to read all samples correctly. + isRunningFn := startWatching(t, watcher, func() time.Time { return now.Add(-1 * time.Millisecond) }) + + // Write to the same segment, without notification. This will rely on readTimeout. + logTestWALRecords(t, ts, w, 1, seriesCount, samplesCount, histogramsCount, exemplarsCount) + expectSegments(t, wdir, 1) + + // We expect data from the last segment. + wt.expectEventually(t, isRunningFn, + seriesCount*2, + seriesCount*samplesCount*2, + seriesCount*exemplarsCount*2, + seriesCount*histogramsCount*2*2, + ) + }) + } +} + +func logBenchWALRealisticRecords(tb *testing.B, w *WL, seriesRecords, seriesPerRecord, sampleRecords int, samplesCase testrecord.RefSamplesCase) { + tb.Helper() + + enc := record.Encoder{} + for i := range seriesRecords { + series := make([]record.RefSeries, seriesPerRecord) + for j := range seriesPerRecord { + series[j] = record.RefSeries{ + Ref: chunks.HeadSeriesRef(i*seriesPerRecord + j), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", 0), "foo", "bar", "foo1", "bar2", "sdfsasdgfadsfgaegga", "dgsfzdsf§sfawf2"), + } + } + rec := enc.Series(series, nil) + require.NoError(tb, w.Log(rec)) + } + for i := 0; i < sampleRecords; i++ { + rec := enc.Samples(testrecord.GenTestRefSamplesCase(tb, samplesCase), nil) + require.NoError(tb, w.Log(rec)) + } +} + +/* + export bench=watcher-read-v1 && go test ./tsdb/wlog/... \ + -run '^$' -bench '^BenchmarkWatcherReadSegment' \ + -benchtime 5s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkWatcherReadSegment(b *testing.B) { + const ( + seriesRecords = 100 // Targets * Scrapes + seriesPerRecord = 10 // New series per scrape. + sampleRecords = seriesRecords // Targets * Scrapes + ) + for _, compress := range compression.Types() { + for _, data := range []testrecord.RefSamplesCase{ + testrecord.Realistic1000Samples, + testrecord.WorstCase1000Samples, + } { + b.Run(fmt.Sprintf("compr=%v/data=%v", compress, data), func(b *testing.B) { + dir := b.TempDir() + wdir := path.Join(dir, "wal") + require.NoError(b, os.Mkdir(wdir, 0o777)) + + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + require.NoError(b, err) + defer func() { + require.NoError(b, w.Close()) + }() + + logBenchWALRealisticRecords(b, w, seriesRecords, seriesPerRecord, sampleRecords, data) + // // Build segment. + // require.NoError(tb, w.flushPage(true)) + logger := promslog.NewNopLogger() + + b.Run("func=readSegmentSeries", func(b *testing.B) { + benchmarkedReadFn := (*Watcher).readSegmentSeries + + wt := newWriteToMock(0) + watcher := newTestWatcher(dir, wt) + // Required as we don't use public method, but invoke readSegment* directly. + watcher.initMetrics() + + // Validate our test data first. + testReadFn(b, wdir, 0, logger, watcher, benchmarkedReadFn) + require.Equal(b, seriesRecords*seriesPerRecord, wt.seriesStored()) + require.Equal(b, 0, wt.samplesAppended) // ReadSegmentSeries skips non-series. + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + testReadFn(b, wdir, 0, logger, watcher, benchmarkedReadFn) + } + }) + b.Run("func=readSegment", func(b *testing.B) { + benchmarkedReadFn := func(w *Watcher, r *LiveReader, segmentNum int) error { + // StartTime being ultra low is required as WorstCase1000Samples have + // math.MinInt32 timestamps (for compression overhead). + return w.readSegment(r, math.MinInt32-1, segmentNum) + } + + wt := newWriteToMock(0) + watcher := newTestWatcher(dir, wt) + // Required as we don't use public method, but invoke readSegment* directly. + watcher.initMetrics() + + // Validate our test data first. + testReadFn(b, wdir, 0, logger, watcher, benchmarkedReadFn) + require.Equal(b, seriesRecords*seriesPerRecord, wt.seriesStored()) + require.Equal(b, sampleRecords*1000, wt.samplesAppended) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + testReadFn(b, wdir, 0, logger, watcher, benchmarkedReadFn) + } + }) + }) + } + } +} + +func testReadFn(tb testing.TB, wdir string, segNum int, logger *slog.Logger, watcher *Watcher, fn segmentReadFn) { + tb.Helper() + + segment, err := OpenReadSegment(SegmentName(wdir, segNum)) + require.NoError(tb, err) + + r := NewLiveReader(logger, watcher.readerMetrics, segment) + require.NoError(tb, fn(watcher, r, segNum)) +} diff --git a/util/testrecord/record.go b/util/testrecord/record.go new file mode 100644 index 0000000000..b8ad683887 --- /dev/null +++ b/util/testrecord/record.go @@ -0,0 +1,69 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testrecord + +import ( + "math" + "testing" + + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" +) + +type RefSamplesCase string + +const ( + Realistic1000Samples RefSamplesCase = "real1000" + WorstCase1000Samples RefSamplesCase = "worst1000" +) + +func GenTestRefSamplesCase(t testing.TB, c RefSamplesCase) []record.RefSample { + t.Helper() + + ret := make([]record.RefSample, 1e3) + switch c { + case Realistic1000Samples: + for i := range ret { + ret[i].Ref = chunks.HeadSeriesRef(i) + ret[i].T = 12423423 + ret[i].V = highVarianceFloat(i) + } + case WorstCase1000Samples: + for i := range ret { + ret[i].Ref = chunks.HeadSeriesRef(i) + + // Worst case is when the values are significantly different + // to each other which breaks delta encoding. + ret[i].T = highVarianceInt(i) + ret[i].V = highVarianceFloat(i) + } + default: + t.Fatal("unknown case", c) + } + return ret +} + +func highVarianceInt(i int) int64 { + if i%2 == 0 { + return math.MinInt32 + } + return math.MaxInt32 +} + +func highVarianceFloat(i int) float64 { + if i%2 == 0 { + return math.SmallestNonzeroFloat32 + } + return math.MaxFloat32 +}