diff --git a/tsdb/agent/db_append_v2.go b/tsdb/agent/db_append_v2.go new file mode 100644 index 0000000000..5c9774cd58 --- /dev/null +++ b/tsdb/agent/db_append_v2.go @@ -0,0 +1,1292 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "errors" + "fmt" + "log/slog" + "math" + "path/filepath" + "sync" + "time" + "unicode/utf8" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "go.uber.org/atomic" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/prometheus/util/zeropool" +) + +const ( + sampleMetricTypeFloat = "float" + sampleMetricTypeHistogram = "histogram" +) + +var ErrUnsupported = errors.New("unsupported operation with WAL-only storage") + +// Default values for options. +var ( + DefaultTruncateFrequency = 2 * time.Hour + DefaultMinWALTime = int64(5 * time.Minute / time.Millisecond) + DefaultMaxWALTime = int64(4 * time.Hour / time.Millisecond) +) + +// Options of the WAL storage. +type Options struct { + // Segments (wal files) max size. + // WALSegmentSize <= 0, segment size is default size. + // WALSegmentSize > 0, segment size is WALSegmentSize. + WALSegmentSize int + + // WALCompression configures the compression type to use on records in the WAL. + WALCompression compression.Type + + // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. + StripeSize int + + // TruncateFrequency determines how frequently to truncate data from the WAL. + TruncateFrequency time.Duration + + // Shortest and longest amount of time data can exist in the WAL before being + // deleted. + MinWALTime, MaxWALTime int64 + + // NoLockfile disables creation and consideration of a lock file. + NoLockfile bool + + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. + OutOfOrderTimeWindow int64 +} + +// DefaultOptions used for the WAL storage. They are reasonable for setups using +// millisecond-precision timestamps. +func DefaultOptions() *Options { + return &Options{ + WALSegmentSize: wlog.DefaultSegmentSize, + WALCompression: compression.None, + StripeSize: tsdb.DefaultStripeSize, + TruncateFrequency: DefaultTruncateFrequency, + MinWALTime: DefaultMinWALTime, + MaxWALTime: DefaultMaxWALTime, + NoLockfile: false, + OutOfOrderTimeWindow: 0, + } +} + +type dbMetrics struct { + r prometheus.Registerer + + numActiveSeries prometheus.Gauge + numWALSeriesPendingDeletion prometheus.Gauge + totalAppendedSamples *prometheus.CounterVec + totalAppendedExemplars prometheus.Counter + totalOutOfOrderSamples prometheus.Counter + walTruncateDuration prometheus.Summary + walCorruptionsTotal prometheus.Counter + walTotalReplayDuration prometheus.Gauge + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter +} + +func newDBMetrics(r prometheus.Registerer) *dbMetrics { + m := dbMetrics{r: r} + m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_agent_active_series", + Help: "Number of active series being tracked by the WAL storage", + }) + + m.numWALSeriesPendingDeletion = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_agent_deleted_series", + Help: "Number of series pending deletion from the WAL", + }) + + m.totalAppendedSamples = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_agent_samples_appended_total", + Help: "Total number of samples appended to the storage", + }, []string{"type"}) + + m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_exemplars_appended_total", + Help: "Total number of exemplars appended to the storage", + }) + + m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_out_of_order_samples_total", + Help: "Total number of out of order samples ingestion failed attempts.", + }) + + m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_agent_truncate_duration_seconds", + Help: "Duration of WAL truncation.", + }) + + m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_corruptions_total", + Help: "Total number of WAL corruptions.", + }) + + m.walTotalReplayDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_agent_data_replay_duration_seconds", + Help: "Time taken to replay the data on disk.", + }) + + m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_deletions_failed_total", + Help: "Total number of checkpoint deletions that failed.", + }) + + m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_deletions_total", + Help: "Total number of checkpoint deletions attempted.", + }) + + m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_creations_failed_total", + Help: "Total number of checkpoint creations that failed.", + }) + + m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_creations_total", + Help: "Total number of checkpoint creations attempted.", + }) + + if r != nil { + r.MustRegister( + m.numActiveSeries, + m.numWALSeriesPendingDeletion, + m.totalAppendedSamples, + m.totalAppendedExemplars, + m.totalOutOfOrderSamples, + m.walTruncateDuration, + m.walCorruptionsTotal, + m.walTotalReplayDuration, + m.checkpointDeleteFail, + m.checkpointDeleteTotal, + m.checkpointCreationFail, + m.checkpointCreationTotal, + ) + } + + return &m +} + +func (m *dbMetrics) Unregister() { + if m.r == nil { + return + } + cs := []prometheus.Collector{ + m.numActiveSeries, + m.numWALSeriesPendingDeletion, + m.totalAppendedSamples, + m.totalAppendedExemplars, + m.totalOutOfOrderSamples, + m.walTruncateDuration, + m.walCorruptionsTotal, + m.walTotalReplayDuration, + m.checkpointDeleteFail, + m.checkpointDeleteTotal, + m.checkpointCreationFail, + m.checkpointCreationTotal, + } + for _, c := range cs { + m.r.Unregister(c) + } +} + +// DB represents a WAL-only storage. It implements storage.DB. +type DB struct { + mtx sync.RWMutex + logger *slog.Logger + opts *Options + rs *remote.Storage + + wal *wlog.WL + locker *tsdbutil.DirLocker + + appenderPool sync.Pool + bufPool sync.Pool + + // These pools are only used during WAL replay and are reset at the end. + // NOTE: Adjust resetWALReplayResources() upon changes to the pools. + walReplaySeriesPool zeropool.Pool[[]record.RefSeries] + walReplaySamplesPool zeropool.Pool[[]record.RefSample] + walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample] + walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] + + nextRef *atomic.Uint64 + series *stripeSeries + // deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they + // must be kept around to). + deleted map[chunks.HeadSeriesRef]int + + donec chan struct{} + stopc chan struct{} + + writeNotified wlog.WriteNotified + + metrics *dbMetrics +} + +// Open returns a new agent.DB in the given directory. +func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) { + opts = validateOptions(opts) + + locker, err := tsdbutil.NewDirLocker(dir, "agent", l, reg) + if err != nil { + return nil, err + } + if !opts.NoLockfile { + if err := locker.Lock(); err != nil { + return nil, err + } + } + + // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. + dir = filepath.Join(dir, "wal") + + w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) + if err != nil { + return nil, fmt.Errorf("creating WAL: %w", err) + } + + db := &DB{ + logger: l, + opts: opts, + rs: rs, + + wal: w, + locker: locker, + + nextRef: atomic.NewUint64(0), + series: newStripeSeries(opts.StripeSize), + deleted: make(map[chunks.HeadSeriesRef]int), + + donec: make(chan struct{}), + stopc: make(chan struct{}), + + metrics: newDBMetrics(reg), + } + + db.bufPool.New = func() any { + return make([]byte, 0, 1024) + } + + db.appenderPool.New = func() any { + return &appender{ + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + pendingHistograms: make([]record.RefHistogramSample, 0, 100), + pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), + pendingExamplars: make([]record.RefExemplar, 0, 10), + } + } + + if err := db.replayWAL(); err != nil { + db.logger.Warn("encountered WAL read error, attempting repair", "err", err) + if err := w.Repair(err); err != nil { + return nil, fmt.Errorf("repair corrupted WAL: %w", err) + } + db.logger.Info("successfully repaired WAL") + } + + go db.run() + return db, nil +} + +// SetWriteNotified allows to set an instance to notify when a write happens. +// It must be used during initialization. It is not safe to use it during execution. +func (db *DB) SetWriteNotified(wn wlog.WriteNotified) { + db.writeNotified = wn +} + +func validateOptions(opts *Options) *Options { + if opts == nil { + opts = DefaultOptions() + } + if opts.WALSegmentSize <= 0 { + opts.WALSegmentSize = wlog.DefaultSegmentSize + } + + if opts.WALCompression == "" { + opts.WALCompression = compression.None + } + + // Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2. + if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) { + opts.StripeSize = tsdb.DefaultStripeSize + } + if opts.TruncateFrequency <= 0 { + opts.TruncateFrequency = DefaultTruncateFrequency + } + if opts.MinWALTime <= 0 { + opts.MinWALTime = DefaultMinWALTime + } + if opts.MaxWALTime <= 0 { + opts.MaxWALTime = DefaultMaxWALTime + } + if opts.MinWALTime > opts.MaxWALTime { + opts.MaxWALTime = opts.MinWALTime + } + + if t := int64(opts.TruncateFrequency / time.Millisecond); opts.MaxWALTime < t { + opts.MaxWALTime = t + } + return opts +} + +func (db *DB) replayWAL() error { + db.logger.Info("replaying WAL, this may take a while", "dir", db.wal.Dir()) + defer db.resetWALReplayResources() + start := time.Now() + + dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir()) + if err != nil && !errors.Is(err, record.ErrNotFound) { + return fmt.Errorf("find last checkpoint: %w", err) + } + + multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} + + if err == nil { + sr, err := wlog.NewSegmentsReader(dir) + if err != nil { + return fmt.Errorf("open checkpoint: %w", err) + } + defer func() { + if err := sr.Close(); err != nil { + db.logger.Warn("error while closing the wal segments reader", "err", err) + } + }() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil { + return fmt.Errorf("backfill checkpoint: %w", err) + } + startFrom++ + db.logger.Info("WAL checkpoint loaded") + } + + // Find the last segment. + _, last, err := wlog.Segments(db.wal.Dir()) + if err != nil { + return fmt.Errorf("finding WAL segments: %w", err) + } + + // Backfill segments from the most recent checkpoint onwards. + for i := startFrom; i <= last; i++ { + seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i)) + if err != nil { + return fmt.Errorf("open WAL segment: %d: %w", i, err) + } + + sr := wlog.NewSegmentBufReader(seg) + err = db.loadWAL(wlog.NewReader(sr), multiRef) + if err := sr.Close(); err != nil { + db.logger.Warn("error while closing the wal segments reader", "err", err) + } + if err != nil { + return err + } + db.logger.Info("WAL segment loaded", "segment", i, "maxSegment", last) + } + + walReplayDuration := time.Since(start) + db.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds()) + + return nil +} + +func (db *DB) resetWALReplayResources() { + db.walReplaySeriesPool = zeropool.Pool[[]record.RefSeries]{} + db.walReplaySamplesPool = zeropool.Pool[[]record.RefSample]{} + db.walReplayHistogramsPool = zeropool.Pool[[]record.RefHistogramSample]{} + db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{} +} + +func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { + var ( + syms = labels.NewSymbolTable() // One table for the whole WAL. + dec = record.NewDecoder(syms, db.logger) + lastRef = chunks.HeadSeriesRef(db.nextRef.Load()) + + decoded = make(chan any, 10) + errCh = make(chan error, 1) + ) + + go func() { + defer close(decoded) + var err error + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := db.walReplaySeriesPool.Get()[:0] + series, err = dec.Series(rec, series) + if err != nil { + errCh <- &wlog.CorruptionErr{ + Err: fmt.Errorf("decode series: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- series + case record.Samples: + samples := db.walReplaySamplesPool.Get()[:0] + samples, err = dec.Samples(rec, samples) + if err != nil { + errCh <- &wlog.CorruptionErr{ + Err: fmt.Errorf("decode samples: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- samples + case record.HistogramSamples, record.CustomBucketsHistogramSamples: + histograms := db.walReplayHistogramsPool.Get()[:0] + histograms, err = dec.HistogramSamples(rec, histograms) + if err != nil { + errCh <- &wlog.CorruptionErr{ + Err: fmt.Errorf("decode histogram samples: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- histograms + case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: + floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0] + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + if err != nil { + errCh <- &wlog.CorruptionErr{ + Err: fmt.Errorf("decode float histogram samples: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- floatHistograms + case record.Tombstones, record.Exemplars: + // We don't care about tombstones or exemplars during replay. + // TODO: If decide to decode exemplars, we should make sure to prepopulate + // stripeSeries.exemplars in the next block by using setLatestExemplar. + continue + default: + errCh <- &wlog.CorruptionErr{ + Err: fmt.Errorf("invalid record type %v", dec.Type(rec)), + Segment: r.Segment(), + Offset: r.Offset(), + } + } + } + }() + + var nonExistentSeriesRefs atomic.Uint64 + + for d := range decoded { + switch v := d.(type) { + case []record.RefSeries: + for _, entry := range v { + // If this is a new series, create it in memory. If we never read in a + // sample for this series, its timestamp will remain at 0 and it will + // be deleted at the next GC. + if db.series.GetByID(entry.Ref) == nil { + series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0} + db.series.Set(entry.Labels.Hash(), series) + multiRef[entry.Ref] = series.ref + db.metrics.numActiveSeries.Inc() + if entry.Ref > lastRef { + lastRef = entry.Ref + } + } + } + db.walReplaySeriesPool.Put(v) + case []record.RefSample: + for _, entry := range v { + // Update the lastTs for the series based + ref, ok := multiRef[entry.Ref] + if !ok { + nonExistentSeriesRefs.Inc() + continue + } + series := db.series.GetByID(ref) + if entry.T > series.lastTs { + series.lastTs = entry.T + } + } + db.walReplaySamplesPool.Put(v) + case []record.RefHistogramSample: + for _, entry := range v { + // Update the lastTs for the series based + ref, ok := multiRef[entry.Ref] + if !ok { + nonExistentSeriesRefs.Inc() + continue + } + series := db.series.GetByID(ref) + if entry.T > series.lastTs { + series.lastTs = entry.T + } + } + db.walReplayHistogramsPool.Put(v) + case []record.RefFloatHistogramSample: + for _, entry := range v { + // Update the lastTs for the series based + ref, ok := multiRef[entry.Ref] + if !ok { + nonExistentSeriesRefs.Inc() + continue + } + series := db.series.GetByID(ref) + if entry.T > series.lastTs { + series.lastTs = entry.T + } + } + db.walReplayFloatHistogramsPool.Put(v) + default: + panic(fmt.Errorf("unexpected decoded type: %T", d)) + } + } + + if v := nonExistentSeriesRefs.Load(); v > 0 { + db.logger.Warn("found sample referencing non-existing series", "skipped_series", v) + } + + db.nextRef.Store(uint64(lastRef)) + + select { + case err := <-errCh: + return err + default: + if r.Err() != nil { + return fmt.Errorf("read records: %w", r.Err()) + } + return nil + } +} + +func (db *DB) run() { + defer close(db.donec) + +Loop: + for { + select { + case <-db.stopc: + break Loop + case <-time.After(db.opts.TruncateFrequency): + // The timestamp ts is used to determine which series are not receiving + // samples and may be deleted from the WAL. Their most recent append + // timestamp is compared to ts, and if that timestamp is older then ts, + // they are considered inactive and may be deleted. + // + // Subtracting a duration from ts will add a buffer for when series are + // considered inactive and safe for deletion. + ts := max(db.rs.LowestSentTimestamp()-db.opts.MinWALTime, 0) + + // Network issues can prevent the result of getRemoteWriteTimestamp from + // changing. We don't want data in the WAL to grow forever, so we set a cap + // on the maximum age data can be. If our ts is older than this cutoff point, + // we'll shift it forward to start deleting very stale data. + if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS { + ts = maxTS + } + + db.logger.Debug("truncating the WAL", "ts", ts) + if err := db.truncate(ts); err != nil { + db.logger.Warn("failed to truncate WAL", "err", err) + } + } + } +} + +// keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. +// last is the last WAL segment that was considered for checkpointing. +// NOTE: the agent implementation here is different from the Prometheus implementation, in that it uses WAL segment numbers instead of timestamps. +func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef) bool { + return func(id chunks.HeadSeriesRef) bool { + // Keep the record if the series exists in the db. + if db.series.GetByID(id) != nil { + return true + } + + // Keep the record if the series was recently deleted. + seg, ok := db.deleted[id] + return ok && seg > last + } +} + +func (db *DB) truncate(mint int64) error { + db.logger.Info("series GC started") + db.mtx.RLock() + defer db.mtx.RUnlock() + + start := time.Now() + + db.gc(mint) + db.logger.Info("series GC completed", "duration", time.Since(start)) + + first, last, err := wlog.Segments(db.wal.Dir()) + if err != nil { + return fmt.Errorf("get segment range: %w", err) + } + + // Start a new segment so low ingestion volume instances don't have more WAL + // than needed. + if _, err := db.wal.NextSegment(); err != nil { + return fmt.Errorf("next segment: %w", err) + } + + last-- // Never consider most recent segment for checkpoint + if last < 0 { + return nil // no segments yet + } + + // The lower two-thirds of segments should contain mostly obsolete samples. + // If we have less than two segments, it's not worth checkpointing yet. + last = first + (last-first)*2/3 + if last <= first { + return nil + } + + db.metrics.checkpointCreationTotal.Inc() + + if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint); err != nil { + db.metrics.checkpointCreationFail.Inc() + var cerr *wlog.CorruptionErr + if errors.As(err, &cerr) { + db.metrics.walCorruptionsTotal.Inc() + } + return fmt.Errorf("create checkpoint: %w", err) + } + if err := db.wal.Truncate(last + 1); err != nil { + // If truncating fails, we'll just try it again at the next checkpoint. + // Leftover segments will still just be ignored in the future if there's a + // checkpoint that supersedes them. + db.logger.Error("truncating segments failed", "err", err) + } + + // The checkpoint is written and segments before it are truncated, so we + // no longer need to track deleted series that were being kept around. + for ref, segment := range db.deleted { + if segment <= last { + delete(db.deleted, ref) + } + } + db.metrics.checkpointDeleteTotal.Inc() + db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) + + if err := wlog.DeleteCheckpoints(db.wal.Dir(), last); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. They will just be ignored since a newer checkpoint + // exists. + db.logger.Error("delete old checkpoints", "err", err) + db.metrics.checkpointDeleteFail.Inc() + } + + db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + + db.logger.Info("WAL checkpoint complete", "first", first, "last", last, "duration", time.Since(start)) + return nil +} + +// gc marks ref IDs that have not received a sample since mint as deleted in +// s.deleted, along with the segment where they originally got deleted. +func (db *DB) gc(mint int64) { + deleted := db.series.GC(mint) + db.metrics.numActiveSeries.Sub(float64(len(deleted))) + + _, last, _ := wlog.Segments(db.wal.Dir()) + + // We want to keep series records for any newly deleted series + // until we've passed the last recorded segment. This prevents + // the WAL having samples for series records that no longer exist. + for ref := range deleted { + db.deleted[ref] = last + } + + db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) +} + +// StartTime implements the Storage interface. +func (*DB) StartTime() (int64, error) { + return int64(model.Latest), nil +} + +// Querier implements the Storage interface. +func (*DB) Querier(int64, int64) (storage.Querier, error) { + return nil, ErrUnsupported +} + +// ChunkQuerier implements the Storage interface. +func (*DB) ChunkQuerier(int64, int64) (storage.ChunkQuerier, error) { + return nil, ErrUnsupported +} + +// ExemplarQuerier implements the Storage interface. +func (*DB) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) { + return nil, ErrUnsupported +} + +// Appender implements storage.Storage. +func (db *DB) Appender(context.Context) storage.Appender { + return db.appenderPool.Get().(storage.Appender) +} + +// Close implements the Storage interface. +func (db *DB) Close() error { + db.mtx.Lock() + defer db.mtx.Unlock() + + close(db.stopc) + <-db.donec + + db.metrics.Unregister() + + return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err() +} + +type appender struct { + *DB + hints *storage.AppendOptions + + pendingSeries []record.RefSeries + pendingSamples []record.RefSample + pendingHistograms []record.RefHistogramSample + pendingFloatHistograms []record.RefFloatHistogramSample + pendingExamplars []record.RefExemplar + + // Pointers to the series referenced by each element of pendingSamples. + // Series lock is not held on elements. + sampleSeries []*memSeries + + // Pointers to the series referenced by each element of pendingHistograms. + // Series lock is not held on elements. + histogramSeries []*memSeries + + // Pointers to the series referenced by each element of pendingFloatHistograms. + // Series lock is not held on elements. + floatHistogramSeries []*memSeries +} + +func (a *appender) SetOptions(opts *storage.AppendOptions) { + a.hints = opts +} + +func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) + + series := a.series.GetByID(headRef) + if series == nil { + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + + a.metrics.numActiveSeries.Inc() + } + } + + series.Lock() + defer series.Unlock() + + if t <= a.minValidTime(series.lastTs) { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + + // NOTE: always modify pendingSamples and sampleSeries together. + a.pendingSamples = append(a.pendingSamples, record.RefSample{ + Ref: series.ref, + T: t, + V: v, + }) + a.sampleSeries = append(a.sampleSeries, series) + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() + return storage.SeriesRef(series.ref), nil +} + +func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) { + hash := l.Hash() + + series = a.series.GetByHash(hash, l) + if series != nil { + return series, false + } + + ref := chunks.HeadSeriesRef(a.nextRef.Inc()) + series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64} + a.series.Set(hash, series) + return series, true +} + +func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + // Series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) + + s := a.series.GetByID(headRef) + if s == nil { + return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", ref) + } + + // Ensure no empty labels have gotten through. + e.Labels = e.Labels.WithoutEmpty() + + if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar) + } + + // Exemplar label length does not include chars involved in text rendering such as quotes + // equals sign, or commas. See definition of const ExemplarMaxLabelLength. + labelSetLen := 0 + err := e.Labels.Validate(func(l labels.Label) error { + labelSetLen += utf8.RuneCountInString(l.Name) + labelSetLen += utf8.RuneCountInString(l.Value) + + if labelSetLen > exemplar.ExemplarMaxLabelSetLength { + return storage.ErrExemplarLabelLength + } + return nil + }) + if err != nil { + return 0, err + } + + // Check for duplicate vs last stored exemplar for this series, and discard those. + // Otherwise, record the current exemplar as the latest. + // Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here. + prevExemplar := a.series.GetLatestExemplar(s.ref) + if prevExemplar != nil && prevExemplar.Equals(e) { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + a.series.SetLatestExemplar(s.ref, &e) + + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ + Ref: s.ref, + T: e.Ts, + V: e.Value, + Labels: e.Labels, + }) + + a.metrics.totalAppendedExemplars.Inc() + return storage.SeriesRef(s.ref), nil +} + +func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h != nil { + if err := h.Validate(); err != nil { + return 0, err + } + } + + if fh != nil { + if err := fh.Validate(); err != nil { + return 0, err + } + } + + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) + + series := a.series.GetByID(headRef) + if series == nil { + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + + a.metrics.numActiveSeries.Inc() + } + } + + series.Lock() + defer series.Unlock() + + if t <= a.minValidTime(series.lastTs) { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + + switch { + case h != nil: + // NOTE: always modify pendingHistograms and histogramSeries together + a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ + Ref: series.ref, + T: t, + H: h, + }) + a.histogramSeries = append(a.histogramSeries, series) + case fh != nil: + // NOTE: always modify pendingFloatHistograms and floatHistogramSeries together + a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: t, + FH: fh, + }) + a.floatHistogramSeries = append(a.floatHistogramSeries, series) + } + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + return storage.SeriesRef(series.ref), nil +} + +func (*appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { + // TODO: Wire metadata in the Agent's appender. + return 0, nil +} + +func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h != nil { + if err := h.Validate(); err != nil { + return 0, err + } + } + if fh != nil { + if err := fh.Validate(); err != nil { + return 0, err + } + } + if st >= t { + return 0, storage.ErrSTNewerThanSample + } + + series := a.series.GetByID(chunks.HeadSeriesRef(ref)) + if series == nil { + // Ensure no empty labels have gotten through. + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + a.metrics.numActiveSeries.Inc() + } + } + + series.Lock() + defer series.Unlock() + + if st <= a.minValidTime(series.lastTs) { + return 0, storage.ErrOutOfOrderST + } + + if st <= series.lastTs { + // discard the sample if it's out of order. + return 0, storage.ErrOutOfOrderST + } + series.lastTs = st + + switch { + case h != nil: + zeroHistogram := &histogram.Histogram{} + a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ + Ref: series.ref, + T: st, + H: zeroHistogram, + }) + a.histogramSeries = append(a.histogramSeries, series) + case fh != nil: + a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: st, + FH: &histogram.FloatHistogram{}, + }) + a.floatHistogramSeries = append(a.floatHistogramSeries, series) + } + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + return storage.SeriesRef(series.ref), nil +} + +func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) { + if st >= t { + return 0, storage.ErrSTNewerThanSample + } + + series := a.series.GetByID(chunks.HeadSeriesRef(ref)) + if series == nil { + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) + } + + newSeries, created := a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: newSeries.ref, + Labels: l, + }) + a.metrics.numActiveSeries.Inc() + } + + series = newSeries + } + + series.Lock() + defer series.Unlock() + + if t <= a.minValidTime(series.lastTs) { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + + if st <= series.lastTs { + // discard the sample if it's out of order. + return 0, storage.ErrOutOfOrderST + } + series.lastTs = st + + // NOTE: always modify pendingSamples and sampleSeries together. + a.pendingSamples = append(a.pendingSamples, record.RefSample{ + Ref: series.ref, + T: st, + V: 0, + }) + a.sampleSeries = append(a.sampleSeries, series) + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() + + return storage.SeriesRef(series.ref), nil +} + +// Commit submits the collected samples and purges the batch. +func (a *appender) Commit() error { + if err := a.log(); err != nil { + return err + } + + a.clearData() + a.appenderPool.Put(a) + + if a.writeNotified != nil { + a.writeNotified.Notify() + } + return nil +} + +// log logs all pending data to the WAL. +func (a *appender) log() error { + a.mtx.RLock() + defer a.mtx.RUnlock() + + var encoder record.Encoder + buf := a.bufPool.Get().([]byte) + defer func() { + a.bufPool.Put(buf) //nolint:staticcheck + }() + + if len(a.pendingSeries) > 0 { + buf = encoder.Series(a.pendingSeries, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + if len(a.pendingSamples) > 0 { + buf = encoder.Samples(a.pendingSamples, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + if len(a.pendingHistograms) > 0 { + var customBucketsHistograms []record.RefHistogramSample + buf, customBucketsHistograms = encoder.HistogramSamples(a.pendingHistograms, buf) + if len(buf) > 0 { + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + if len(customBucketsHistograms) > 0 { + buf = encoder.CustomBucketsHistogramSamples(customBucketsHistograms, nil) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + } + + if len(a.pendingFloatHistograms) > 0 { + var customBucketsFloatHistograms []record.RefFloatHistogramSample + buf, customBucketsFloatHistograms = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) + if len(buf) > 0 { + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + if len(customBucketsFloatHistograms) > 0 { + buf = encoder.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, nil) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + } + + if len(a.pendingExamplars) > 0 { + buf = encoder.Exemplars(a.pendingExamplars, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + var series *memSeries + for i, s := range a.pendingSamples { + series = a.sampleSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } + for i, s := range a.pendingHistograms { + series = a.histogramSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } + for i, s := range a.pendingFloatHistograms { + series = a.floatHistogramSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } + + return nil +} + +// clearData clears all pending data. +func (a *appender) clearData() { + a.pendingSeries = a.pendingSeries[:0] + a.pendingSamples = a.pendingSamples[:0] + a.pendingHistograms = a.pendingHistograms[:0] + a.pendingFloatHistograms = a.pendingFloatHistograms[:0] + a.pendingExamplars = a.pendingExamplars[:0] + a.sampleSeries = a.sampleSeries[:0] + a.histogramSeries = a.histogramSeries[:0] + a.floatHistogramSeries = a.floatHistogramSeries[:0] +} + +func (a *appender) Rollback() error { + // Series are created in-memory regardless of rollback. This means we must + // log them to the WAL, otherwise subsequent commits may reference a series + // which was never written to the WAL. + if err := a.logSeries(); err != nil { + return err + } + + a.clearData() + a.appenderPool.Put(a) + return nil +} + +// logSeries logs only pending series records to the WAL. +func (a *appender) logSeries() error { + a.mtx.RLock() + defer a.mtx.RUnlock() + + if len(a.pendingSeries) > 0 { + buf := a.bufPool.Get().([]byte) + defer func() { + a.bufPool.Put(buf) //nolint:staticcheck + }() + + var encoder record.Encoder + buf = encoder.Series(a.pendingSeries, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + return nil +} + +// minValidTime returns the minimum timestamp that a sample can have +// and is needed for preventing underflow. +func (a *appender) minValidTime(lastTs int64) int64 { + if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow { + return math.MinInt64 + } + + return lastTs - a.opts.OutOfOrderTimeWindow +} diff --git a/tsdb/agent/db_append_v2_test.go b/tsdb/agent/db_append_v2_test.go new file mode 100644 index 0000000000..7409f79ec5 --- /dev/null +++ b/tsdb/agent/db_append_v2_test.go @@ -0,0 +1,1396 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "errors" + "fmt" + "io" + "math" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestDB_InvalidSeries(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + defer s.Close() + + app := s.Appender(context.Background()) + + t.Run("Samples", func(t *testing.T) { + _, err := app.Append(0, labels.Labels{}, 0, 0) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") + + _, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") + }) + + t.Run("Histograms", func(t *testing.T) { + _, err := app.AppendHistogram(0, labels.Labels{}, 0, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") + + _, err = app.AppendHistogram(0, labels.FromStrings("a", "1", "a", "2"), 0, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") + }) + + t.Run("Exemplars", func(t *testing.T) { + sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0) + require.NoError(t, err, "should not reject valid series") + + _, err = app.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{}) + require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0") + + e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1", "a", "2")} + _, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels") + + e = exemplar.Exemplar{Labels: labels.FromStrings("a_somewhat_long_trace_id", "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa")} + _, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length") + + // Inverse check + e = exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true} + _, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + require.NoError(t, err, "should not reject valid exemplars") + }) +} + +func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *DB { + t.Helper() + + dbDir := t.TempDir() + rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) + t.Cleanup(func() { + require.NoError(t, rs.Close()) + }) + + db, err := Open(promslog.NewNopLogger(), reg, rs, dbDir, opts) + require.NoError(t, err) + return db +} + +func TestUnsupportedFunctions(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + defer s.Close() + + t.Run("Querier", func(t *testing.T) { + _, err := s.Querier(0, 0) + require.Equal(t, err, ErrUnsupported) + }) + + t.Run("ChunkQuerier", func(t *testing.T) { + _, err := s.ChunkQuerier(0, 0) + require.Equal(t, err, ErrUnsupported) + }) + + t.Run("ExemplarQuerier", func(t *testing.T) { + _, err := s.ExemplarQuerier(context.TODO()) + require.Equal(t, err, ErrUnsupported) + }) +} + +func TestCommit(t *testing.T) { + const ( + numDatapoints = 1000 + numHistograms = 100 + numSeries = 8 + ) + + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name(), numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for i := range numDatapoints { + sample := chunks.GenerateSamples(0, 1) + ref, err := app.Append(0, lset, sample[0].T(), sample[0].F()) + require.NoError(t, err) + + e := exemplar.Exemplar{ + Labels: lset, + Ts: sample[0].T() + int64(i), + Value: sample[0].F(), + HasTs: true, + } + _, err = app.AppendExemplar(ref, lset, e) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), customBucketHistograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), nil, customBucketFloatHistograms[i]) + require.NoError(t, err) + } + } + + require.NoError(t, app.Commit()) + require.NoError(t, s.Close()) + + sr, err := wlog.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + defer func() { + require.NoError(t, sr.Close()) + }() + + // Read records from WAL and check for expected count of series, samples, and exemplars. + var ( + r = wlog.NewReader(sr) + dec = record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) + + walSeriesCount, walSamplesCount, walExemplarsCount, walHistogramCount, walFloatHistogramCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) + + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) + + case record.HistogramSamples, record.CustomBucketsHistogramSamples: + var histograms []record.RefHistogramSample + histograms, err = dec.HistogramSamples(rec, histograms) + require.NoError(t, err) + walHistogramCount += len(histograms) + + case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: + var floatHistograms []record.RefFloatHistogramSample + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + require.NoError(t, err) + walFloatHistogramCount += len(floatHistograms) + + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + default: + } + } + + // Check that the WAL contained the same number of committed series/samples/exemplars. + require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series") + require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples") + require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars") + require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms") + require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms") +} + +func TestRollback(t *testing.T) { + const ( + numDatapoints = 1000 + numHistograms = 100 + numSeries = 8 + ) + + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name(), numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for range numDatapoints { + sample := chunks.GenerateSamples(0, 1) + _, err := app.Append(0, lset, sample[0].T(), sample[0].F()) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + // Do a rollback, which should clear uncommitted data. A followup call to + // commit should persist nothing to the WAL. + require.NoError(t, app.Rollback()) + require.NoError(t, app.Commit()) + require.NoError(t, s.Close()) + + sr, err := wlog.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + defer func() { + require.NoError(t, sr.Close()) + }() + + // Read records from WAL and check for expected count of series and samples. + var ( + r = wlog.NewReader(sr) + dec = record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) + + walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) + + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) + + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + case record.HistogramSamples, record.CustomBucketsHistogramSamples: + var histograms []record.RefHistogramSample + histograms, err = dec.HistogramSamples(rec, histograms) + require.NoError(t, err) + walHistogramCount += len(histograms) + + case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: + var floatHistograms []record.RefFloatHistogramSample + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + require.NoError(t, err) + walFloatHistogramCount += len(floatHistograms) + + default: + } + } + + // Check that only series get stored after calling Rollback. + require.Equal(t, numSeries*5, walSeriesCount, "series should have been written to WAL") + require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") + require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") + require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL") + require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL") +} + +func TestFullTruncateWAL(t *testing.T) { + const ( + numDatapoints = 1000 + numHistograms = 100 + numSeries = 800 + lastTs = 500 + ) + + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.TruncateFrequency = time.Minute * 2 + + s := createTestAgentDB(t, reg, opts) + defer func() { + require.NoError(t, s.Close()) + }() + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name(), numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for range numDatapoints { + _, err := app.Append(0, lset, int64(lastTs), 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Truncate WAL with mint to GC all the samples. + s.truncate(lastTs + 1) + + m := gatherFamily(t, reg, "prometheus_agent_deleted_series") + require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") +} + +func TestPartialTruncateWAL(t *testing.T) { + const ( + numDatapoints = 1000 + numSeries = 800 + ) + + opts := DefaultOptions() + + reg := prometheus.NewRegistry() + s := createTestAgentDB(t, reg, opts) + defer func() { + require.NoError(t, s.Close()) + }() + app := s.Appender(context.TODO()) + + // Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500. + var lastTs int64 = 500 + lbls := labelsForTest(t.Name()+"batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for range numDatapoints { + _, err := app.Append(0, lset, lastTs, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_histogram_batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram_batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_float_histogram_batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram_batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600. + lastTs = 600 + lbls = labelsForTest(t.Name()+"batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for range numDatapoints { + _, err := app.Append(0, lset, lastTs, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_histogram_batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram_batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_float_histogram_batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram_batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series. + s.truncate(lastTs - 1) + + m := gatherFamily(t, reg, "prometheus_agent_deleted_series") + require.Len(t, m.Metric, 1) + require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") +} + +func TestWALReplay(t *testing.T) { + const ( + numDatapoints = 1000 + numHistograms = 100 + numSeries = 8 + lastTs = 500 + ) + + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name(), numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for range numDatapoints { + _, err := app.Append(0, lset, lastTs, 0) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := range numHistograms { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + require.NoError(t, app.Commit()) + require.NoError(t, s.Close()) + + // Hack: s.wal.Dir() is the /wal subdirectory of the original storage path. + // We need the original directory so we can recreate the storage for replay. + storageDir := filepath.Dir(s.wal.Dir()) + + reg := prometheus.NewRegistry() + replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + defer func() { + require.NoError(t, replayStorage.Close()) + }() + + // Check if all the series are retrieved back from the WAL. + m := gatherFamily(t, reg, "prometheus_agent_active_series") + require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") + + // Check if lastTs of the samples retrieved from the WAL is retained. + metrics := replayStorage.series.series + for i := range metrics { + mp := metrics[i] + for _, v := range mp { + require.Equal(t, v.lastTs, int64(lastTs)) + } + } +} + +func TestLockfile(t *testing.T) { + tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { + logger := promslog.NewNopLogger() + reg := prometheus.NewRegistry() + rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false) + t.Cleanup(func() { + require.NoError(t, rs.Close()) + }) + + opts := DefaultOptions() + opts.NoLockfile = !createLock + + // Create the DB. This should create lockfile and its metrics. + db, err := Open(logger, nil, rs, data, opts) + require.NoError(t, err) + + return db.locker, testutil.NewCallbackCloser(func() { + require.NoError(t, db.Close()) + }) + }) +} + +func Test_ExistingWAL_NextRef(t *testing.T) { + dbDir := t.TempDir() + rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) + defer func() { + require.NoError(t, rs.Close()) + }() + + db, err := Open(promslog.NewNopLogger(), nil, rs, dbDir, DefaultOptions()) + require.NoError(t, err) + + seriesCount := 10 + + // Append series + app := db.Appender(context.Background()) + for i := range seriesCount { + lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("series_%d", i)) + _, err := app.Append(0, lset, 0, 100) + require.NoError(t, err) + } + + histogramCount := 10 + histograms := tsdbutil.GenerateTestHistograms(histogramCount) + // Append series + for i := range histogramCount { + lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("histogram_%d", i)) + _, err := app.AppendHistogram(0, lset, 0, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Truncate the WAL to force creation of a new segment. + require.NoError(t, db.truncate(0)) + require.NoError(t, db.Close()) + + // Create a new storage and see what nextRef is initialized to. + db, err = Open(promslog.NewNopLogger(), nil, rs, dbDir, DefaultOptions()) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + require.Equal(t, uint64(seriesCount+histogramCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL") +} + +func Test_validateOptions(t *testing.T) { + t.Run("Apply defaults to zero values", func(t *testing.T) { + opts := validateOptions(&Options{}) + require.Equal(t, DefaultOptions(), opts) + }) + + t.Run("Defaults are already valid", func(t *testing.T) { + require.Equal(t, DefaultOptions(), validateOptions(nil)) + }) + + t.Run("MaxWALTime should not be lower than TruncateFrequency", func(t *testing.T) { + opts := validateOptions(&Options{ + MaxWALTime: int64(time.Hour / time.Millisecond), + TruncateFrequency: 2 * time.Hour, + }) + require.Equal(t, int64(2*time.Hour/time.Millisecond), opts.MaxWALTime) + }) +} + +func startTime() (int64, error) { + return time.Now().Unix() * 1000, nil +} + +// Create series for tests. +func labelsForTest(lName string, seriesCount int) [][]labels.Label { + var series [][]labels.Label + + for i := range seriesCount { + lset := []labels.Label{ + {Name: "a", Value: lName}, + {Name: "instance", Value: "localhost" + strconv.Itoa(i)}, + {Name: "job", Value: "prometheus"}, + } + series = append(series, lset) + } + + return series +} + +func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto.MetricFamily { + t.Helper() + + families, err := reg.Gather() + require.NoError(t, err, "failed to gather metrics") + + for _, f := range families { + if f.GetName() == familyName { + return f + } + } + + t.Fatalf("could not find family %s", familyName) + + return nil +} + +func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.Background()) + defer s.Close() + + sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0) + require.NoError(t, err, "should not reject valid series") + + // Write a few exemplars to our appender and call Commit(). + // If the Labels, Value or Timestamp are different than the last exemplar, + // then a new one should be appended; Otherwise, it should be skipped. + e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true} + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + + e.Labels = labels.FromStrings("b", "2") + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + + e.Value = 42 + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + + e.Ts = 25 + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + + require.NoError(t, app.Commit()) + + // Read back what was written to the WAL. + var walExemplarsCount int + sr, err := wlog.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + defer sr.Close() + r := wlog.NewReader(sr) + + dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) + for r.Next() { + rec := r.Record() + if dec.Type(rec) == record.Exemplars { + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + } + } + + // We had 9 calls to AppendExemplar but only 4 of those should have gotten through. + require.Equal(t, 4, walExemplarsCount) +} + +func TestDBAllowOOOSamples(t *testing.T) { + const ( + numDatapoints = 5 + numHistograms = 5 + numSeries = 4 + offset = 100 + ) + + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = math.MaxInt64 + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + // Let's add some samples in the [offset, offset+numDatapoints) range. + lbls := labelsForTest(t.Name(), numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for i := offset; i < numDatapoints+offset; i++ { + ref, err := app.Append(0, lset, int64(i), float64(i)) + require.NoError(t, err) + + e := exemplar.Exemplar{ + Labels: lset, + Ts: int64(i) * 2, + Value: float64(i), + HasTs: true, + } + _, err = app.AppendExemplar(ref, lset, e) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numHistograms) + + for i := offset; i < numDatapoints+offset; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := offset; i < numDatapoints+offset; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) + + for i := offset; i < numDatapoints+offset; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset]) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := offset; i < numDatapoints+offset; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset]) + require.NoError(t, err) + } + } + + require.NoError(t, app.Commit()) + m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total") + require.Equal(t, float64(20), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") + require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") + require.NoError(t, s.Close()) + + // Hack: s.wal.Dir() is the /wal subdirectory of the original storage path. + // We need the original directory so we can recreate the storage for replay. + storageDir := filepath.Dir(s.wal.Dir()) + + // Replay the storage so that the lastTs for each series is recorded. + reg2 := prometheus.NewRegistry() + db, err := Open(s.logger, reg2, nil, storageDir, s.opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + app = db.Appender(context.Background()) + + // Now the lastTs will have been recorded successfully. + // Let's try appending twice as many OOO samples in the [0, numDatapoints) range. + lbls = labelsForTest(t.Name()+"_histogram", numSeries*2) + for _, l := range lbls { + lset := labels.New(l...) + + for i := range numDatapoints { + ref, err := app.Append(0, lset, int64(i), float64(i)) + require.NoError(t, err) + + e := exemplar.Exemplar{ + Labels: lset, + Ts: int64(i) * 2, + Value: float64(i), + HasTs: true, + } + _, err = app.AppendExemplar(ref, lset, e) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_histogram", numSeries*2) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestHistograms(numHistograms) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries*2) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries*2) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries*2) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) + + for i := range numDatapoints { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + + require.NoError(t, app.Commit()) + m = gatherFamily(t, reg2, "prometheus_agent_samples_appended_total") + require.Equal(t, float64(40), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") + require.Equal(t, float64(160), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") + require.NoError(t, db.Close()) +} + +func TestDBOutOfOrderTimeWindow(t *testing.T) { + tc := []struct { + outOfOrderTimeWindow, firstTs, secondTs int64 + expectedError error + }{ + {0, 100, 101, nil}, + {0, 100, 100, storage.ErrOutOfOrderSample}, + {0, 100, 99, storage.ErrOutOfOrderSample}, + {100, 100, 1, nil}, + {100, 100, 0, storage.ErrOutOfOrderSample}, + } + + for _, c := range tc { + t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, expectedError=%s", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.expectedError), func(t *testing.T) { + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = c.outOfOrderTimeWindow + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name()+"_histogram", 1) + lset := labels.New(lbls[0]...) + _, err := app.AppendHistogram(0, lset, c.firstTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + _, err = app.AppendHistogram(0, lset, c.secondTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, c.expectedError) + + lbls = labelsForTest(t.Name(), 1) + lset = labels.New(lbls[0]...) + _, err = app.Append(0, lset, c.firstTs, 0) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + _, err = app.Append(0, lset, c.secondTs, 0) + require.ErrorIs(t, err, c.expectedError) + + expectedAppendedSamples := float64(2) + if c.expectedError != nil { + expectedAppendedSamples = 1 + } + m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total") + require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") + require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") + require.NoError(t, s.Close()) + }) + } +} + +type walSample struct { + t int64 + f float64 + h *histogram.Histogram + lbls labels.Labels + ref storage.SeriesRef +} + +func TestDBStartTimestampSamplesIngestion(t *testing.T) { + t.Parallel() + + type appendableSample struct { + t int64 + st int64 + v float64 + lbls labels.Labels + h *histogram.Histogram + expectsError bool + } + + testHistogram := tsdbutil.GenerateTestHistograms(1)[0] + zeroHistogram := &histogram.Histogram{} + + lbls := labelsForTest(t.Name(), 1) + defLbls := labels.New(lbls[0]...) + + testCases := []struct { + name string + inputSamples []appendableSample + expectedSamples []*walSample + expectedSeriesCount int + }{ + { + name: "in order ct+normal sample/floatSamples", + inputSamples: []appendableSample{ + {t: 100, st: 1, v: 10, lbls: defLbls}, + {t: 101, st: 1, v: 10, lbls: defLbls}, + }, + expectedSamples: []*walSample{ + {t: 1, f: 0, lbls: defLbls}, + {t: 100, f: 10, lbls: defLbls}, + {t: 101, f: 10, lbls: defLbls}, + }, + }, + { + name: "ST+float && ST+histogram samples", + inputSamples: []appendableSample{ + { + t: 100, + st: 30, + v: 20, + lbls: defLbls, + }, + { + t: 300, + st: 230, + h: testHistogram, + lbls: defLbls, + }, + }, + expectedSamples: []*walSample{ + {t: 30, f: 0, lbls: defLbls}, + {t: 100, f: 20, lbls: defLbls}, + {t: 230, h: zeroHistogram, lbls: defLbls}, + {t: 300, h: testHistogram, lbls: defLbls}, + }, + expectedSeriesCount: 1, + }, + { + name: "ST+float && ST+histogram samples with error", + inputSamples: []appendableSample{ + { + // invalid ST + t: 100, + st: 100, + v: 10, + lbls: defLbls, + expectsError: true, + }, + { + // invalid ST histogram + t: 300, + st: 300, + h: testHistogram, + lbls: defLbls, + expectsError: true, + }, + }, + expectedSamples: []*walSample{ + {t: 100, f: 10, lbls: defLbls}, + {t: 300, h: testHistogram, lbls: defLbls}, + }, + expectedSeriesCount: 0, + }, + { + name: "In order ct+normal sample/histogram", + inputSamples: []appendableSample{ + {t: 100, h: testHistogram, st: 1, lbls: defLbls}, + {t: 101, h: testHistogram, st: 1, lbls: defLbls}, + }, + expectedSamples: []*walSample{ + {t: 1, h: &histogram.Histogram{}}, + {t: 100, h: testHistogram}, + {t: 101, h: &histogram.Histogram{CounterResetHint: histogram.NotCounterReset}}, + }, + }, + { + name: "ct+normal then OOO sample/float", + inputSamples: []appendableSample{ + {t: 60_000, st: 40_000, v: 10, lbls: defLbls}, + {t: 120_000, st: 40_000, v: 10, lbls: defLbls}, + {t: 180_000, st: 40_000, v: 10, lbls: defLbls}, + {t: 50_000, st: 40_000, v: 10, lbls: defLbls}, + }, + expectedSamples: []*walSample{ + {t: 40_000, f: 0, lbls: defLbls}, + {t: 50_000, f: 10, lbls: defLbls}, + {t: 60_000, f: 10, lbls: defLbls}, + {t: 120_000, f: 10, lbls: defLbls}, + {t: 180_000, f: 10, lbls: defLbls}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 360_000 + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + for _, sample := range tc.inputSamples { + // We supposed to write a Histogram to the WAL + if sample.h != nil { + _, err := app.AppendHistogramSTZeroSample(0, sample.lbls, sample.t, sample.st, zeroHistogram, nil) + if !errors.Is(err, storage.ErrOutOfOrderST) { + require.Equal(t, sample.expectsError, err != nil, "expected error: %v, got: %v", sample.expectsError, err) + } + + _, err = app.AppendHistogram(0, sample.lbls, sample.t, sample.h, nil) + require.NoError(t, err) + } else { + // We supposed to write a float sample to the WAL + _, err := app.AppendSTZeroSample(0, sample.lbls, sample.t, sample.st) + if !errors.Is(err, storage.ErrOutOfOrderST) { + require.Equal(t, sample.expectsError, err != nil, "expected error: %v, got: %v", sample.expectsError, err) + } + + _, err = app.Append(0, sample.lbls, sample.t, sample.v) + require.NoError(t, err) + } + } + + require.NoError(t, app.Commit()) + // Close the DB to ensure all data is flushed to the WAL + require.NoError(t, s.Close()) + + // Check that we dont have any OOO samples in the WAL by checking metrics + families, err := reg.Gather() + require.NoError(t, err, "failed to gather metrics") + for _, f := range families { + if f.GetName() == "prometheus_agent_out_of_order_samples_total" { + t.Fatalf("unexpected metric %s", f.GetName()) + } + } + + outputSamples := readWALSamples(t, s.wal.Dir()) + + require.Len(t, outputSamples, len(tc.expectedSamples), "Expected %d samples", len(tc.expectedSamples)) + + for i, expectedSample := range tc.expectedSamples { + for _, sample := range outputSamples { + if sample.t == expectedSample.t && sample.lbls.String() == expectedSample.lbls.String() { + if expectedSample.h != nil { + require.Equal(t, expectedSample.h, sample.h, "histogram value mismatch (sample index %d)", i) + } else { + require.Equal(t, expectedSample.f, sample.f, "value mismatch (sample index %d)", i) + } + } + } + } + }) + } +} + +func readWALSamples(t *testing.T, walDir string) []*walSample { + t.Helper() + sr, err := wlog.NewSegmentsReader(walDir) + require.NoError(t, err) + defer func(sr io.ReadCloser) { + err := sr.Close() + require.NoError(t, err) + }(sr) + + r := wlog.NewReader(sr) + dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) + + var ( + samples []record.RefSample + histograms []record.RefHistogramSample + + lastSeries record.RefSeries + outputSamples = make([]*walSample, 0) + ) + + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, nil) + require.NoError(t, err) + lastSeries = series[0] + case record.Samples: + samples, err = dec.Samples(rec, samples[:0]) + require.NoError(t, err) + for _, s := range samples { + outputSamples = append(outputSamples, &walSample{ + t: s.T, + f: s.V, + lbls: lastSeries.Labels.Copy(), + ref: storage.SeriesRef(lastSeries.Ref), + }) + } + case record.HistogramSamples: + histograms, err = dec.HistogramSamples(rec, histograms[:0]) + require.NoError(t, err) + for _, h := range histograms { + outputSamples = append(outputSamples, &walSample{ + t: h.T, + h: h.H, + lbls: lastSeries.Labels.Copy(), + ref: storage.SeriesRef(lastSeries.Ref), + }) + } + } + } + + return outputSamples +} + +func BenchmarkCreateSeries(b *testing.B) { + s := createTestAgentDB(b, nil, DefaultOptions()) + defer s.Close() + + app := s.Appender(context.Background()).(*appender) + lbls := make([]labels.Labels, b.N) + + for i, l := range labelsForTest("benchmark", b.N) { + lbls[i] = labels.New(l...) + } + + b.ResetTimer() + + for _, l := range lbls { + app.getOrCreate(l) + } +}