diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ad9108a6df..685b45fa7a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -844,10 +844,16 @@ func main() { template.RegisterFeatures(features.DefaultRegistry) var ( - localStorage = &readyStorage{stats: tsdb.NewDBStats()} - scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels) - fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) + localStorage = &readyStorage{stats: tsdb.NewDBStats()} + scraper = &readyScrapeManager{} + storeST = cfg.tsdb.EnableSTStorage + ) + if agentMode { + storeST = cfg.agent.EnableSTStorage + } + var ( + remoteStorage = remote.NewStorageWithStoreST(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels, storeST) + fanoutStorage = storage.NewFanoutWithStoreST(logger, storeST, localStorage, remoteStorage) ) var ( diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index e7a9a7f18a..fd9362d8c5 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -84,11 +84,11 @@ func getCompatibleBlockDuration(maxBlockDuration int64) int64 { return blockDuration } -func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string) (returnErr error) { +func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string, stStorageEnabled bool) (returnErr error) { blockDuration := getCompatibleBlockDuration(maxBlockDuration) mint = blockDuration * (mint / blockDuration) - db, err := tsdb.OpenDBReadOnly(outputDir, "", nil) + db, err := tsdb.OpenDBReadOnly(outputDir, "", nil, stStorageEnabled) if err != nil { return err } @@ -228,13 +228,13 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn return nil } -func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) (err error) { +func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string, stStorageEnabled bool) (err error) { p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps. maxt, mint, err := getMinAndMaxTimestamps(p) if err != nil { return fmt.Errorf("getting min and max timestamp: %w", err) } - if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels); err != nil { + if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels, stStorageEnabled); err != nil { return fmt.Errorf("block creation: %w", err) } return nil diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index 499b90e99a..84a78ef18b 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -735,7 +735,7 @@ after_eof 1 2 outputDir := t.TempDir() - err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration, test.Labels) + err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration, test.Labels, false) if !test.IsOk { require.Error(t, err, test.Description) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index abb709c31d..4574668014 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -282,6 +282,7 @@ func main() { importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.") importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool() importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool() + importSTStorage := importCmd.Flag("st-storage", "Enable storage of sample start times in blocks.").Hidden().Bool() maxBlockDuration := importCmd.Flag("max-block-duration", "Maximum duration created blocks may span. Anything less than 2h is ignored.").Hidden().PlaceHolder("").Duration() openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.") openMetricsLabels := openMetricsImportCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times. Example --label=label_name=label_value").StringMap() @@ -452,7 +453,7 @@ func main() { os.Exit(checkErr(dumpTSDBData(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics, promtoolParser))) // TODO(aSquare14): Work on adding support for custom block size. case openMetricsImportCmd.FullCommand(): - os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration, *openMetricsLabels)) + os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration, *openMetricsLabels, *importSTStorage)) case importRulesCmd.FullCommand(): os.Exit(checkErr(importRules(serverURL, httpRoundTripper, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, model.UTF8Validation, *importRulesFiles...))) diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 1aaf87bc42..4f49519e65 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -333,7 +333,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { } func listBlocks(path string, humanReadable bool) error { - db, err := tsdb.OpenDBReadOnly(path, "", nil) + db, err := tsdb.OpenDBReadOnly(path, "", nil, true) if err != nil { return err } @@ -388,7 +388,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string { } func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { - db, err := tsdb.OpenDBReadOnly(path, "", nil) + db, err := tsdb.OpenDBReadOnly(path, "", nil, true) if err != nil { return nil, nil, err } @@ -707,7 +707,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. type SeriesSetFormatter func(series storage.SeriesSet) error func dumpTSDBData(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter, p parser.Parser) (err error) { - db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil) + db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil, true) if err != nil { return err } @@ -845,7 +845,7 @@ func checkErr(err error) int { return 0 } -func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int { +func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string, stStorageEnabled bool) int { var buf []byte info, err := os.Stat(path) if err != nil { @@ -870,7 +870,7 @@ func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxB return checkErr(fmt.Errorf("create output dir: %w", err)) } - return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels)) + return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels, stStorageEnabled)) } func displayHistogram(dataType string, datas []int, total int) { diff --git a/cmd/promtool/tsdb_posix_test.go b/cmd/promtool/tsdb_posix_test.go index 9d0034844f..9316748bc6 100644 --- a/cmd/promtool/tsdb_posix_test.go +++ b/cmd/promtool/tsdb_posix_test.go @@ -53,7 +53,7 @@ func TestTSDBDumpOpenMetricsRoundTripPipe(t *testing.T) { }() // Import samples from OM format - code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{}) + code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{}, false) require.Equal(t, 0, code) db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil) require.NoError(t, err) diff --git a/cmd/promtool/tsdb_test.go b/cmd/promtool/tsdb_test.go index 86d7c67d77..65abac64cc 100644 --- a/cmd/promtool/tsdb_test.go +++ b/cmd/promtool/tsdb_test.go @@ -228,7 +228,7 @@ func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) { dbDir := t.TempDir() // Import samples from OM format - err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour, map[string]string{}) + err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour, map[string]string{}, false) require.NoError(t, err) db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil) require.NoError(t, err) diff --git a/storage/fanout.go b/storage/fanout.go index 21f5f715e4..248fc43e58 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -31,6 +31,7 @@ type fanout struct { primary Storage secondaries []Storage + storeST bool } // NewFanout returns a new fanout Storage, which proxies reads and writes @@ -43,10 +44,16 @@ type fanout struct { // // NOTE: In the case of Prometheus, it treats all remote storages as secondary / best effort. func NewFanout(logger *slog.Logger, primary Storage, secondaries ...Storage) Storage { + return NewFanoutWithStoreST(logger, false, primary, secondaries...) +} + +// NewFanoutWithStoreST returns a new fanout Storage with start timestamp storage enabled or disabled. +func NewFanoutWithStoreST(logger *slog.Logger, storeST bool, primary Storage, secondaries ...Storage) Storage { return &fanout{ logger: logger, primary: primary, secondaries: secondaries, + storeST: storeST, } } @@ -120,7 +127,7 @@ func (f *fanout) ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) { } secondaries = append(secondaries, querier) } - return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil + return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMergerWithStoreST(ChainedSeriesMerge, f.storeST)), nil } func (f *fanout) Appender(ctx context.Context) Appender { diff --git a/storage/merge.go b/storage/merge.go index 76bf0994e0..ec17e39ee1 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -722,6 +722,11 @@ func (h *samplesIteratorHeap) Pop() any { // NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead // to handle overlaps between series. func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc { + return NewCompactingChunkSeriesMergerWithStoreST(mergeFunc, false) +} + +// NewCompactingChunkSeriesMergerWithStoreST is like NewCompactingChunkSeriesMerger, but uses storeST when re-encoding. +func NewCompactingChunkSeriesMergerWithStoreST(mergeFunc VerticalSeriesMergeFunc, storeST bool) VerticalChunkSeriesMergeFunc { return func(series ...ChunkSeries) ChunkSeries { if len(series) == 0 { return nil @@ -736,6 +741,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC return &compactChunkIterator{ mergeFunc: mergeFunc, iterators: iterators, + storeST: storeST, } }, } @@ -748,6 +754,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC type compactChunkIterator struct { mergeFunc VerticalSeriesMergeFunc iterators []chunks.Iterator + storeST bool h chunkIteratorHeap @@ -813,7 +820,7 @@ func (c *compactChunkIterator) Next() bool { } // Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here. - iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil) + iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...), c.storeST).Iterator(nil) if !iter.Next() { if c.err = iter.Err(); c.err != nil { return false diff --git a/storage/remote/read.go b/storage/remote/read.go index 70b55980b8..0bd9877bda 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -29,6 +29,7 @@ type sampleAndChunkQueryableClient struct { requiredMatchers []*labels.Matcher readRecent bool callback startTimeCallback + storeST bool } // NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets. @@ -38,6 +39,7 @@ func NewSampleAndChunkQueryableClient( requiredMatchers []*labels.Matcher, readRecent bool, callback startTimeCallback, + storeST bool, ) storage.SampleAndChunkQueryable { return &sampleAndChunkQueryableClient{ client: c, @@ -46,6 +48,7 @@ func NewSampleAndChunkQueryableClient( requiredMatchers: requiredMatchers, readRecent: readRecent, callback: callback, + storeST: storeST, } } @@ -84,6 +87,7 @@ func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage. externalLabels: c.externalLabels, requiredMatchers: c.requiredMatchers, }, + storeST: c.storeST, } if c.readRecent { return cq, nil @@ -229,13 +233,14 @@ func (*querier) Close() error { // chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier. type chunkQuerier struct { querier + storeST bool } // Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client. // It uses remote.querier.Select so it supports external labels and required matchers if specified. func (q *chunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { // TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket). - return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...)) + return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...), q.storeST) } // Note strings in toFilter must be sorted. diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 49f29d9001..bb2cc3204f 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -527,6 +527,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { tc.requiredMatchers, tc.readRecent, tc.callback, + false, ) q, err := c.Querier(tc.mint, tc.maxt) require.NoError(t, err) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index be75d23383..e3d7cb366c 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -56,7 +56,8 @@ type Storage struct { logger *slog.Logger mtx sync.Mutex - rws *WriteStorage + rws *WriteStorage + storeST bool // For reads. queryables []storage.SampleAndChunkQueryable @@ -67,6 +68,11 @@ var _ storage.Storage = &Storage{} // NewStorage returns a remote.Storage. func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage { + return NewStorageWithStoreST(l, reg, stCallback, walDir, flushDeadline, sm, enableTypeAndUnitLabels, false) +} + +// NewStorageWithStoreST returns a remote.Storage with start timestamp storage enabled or disabled. +func NewStorageWithStoreST(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels, storeST bool) *Storage { if l == nil { l = promslog.NewNopLogger() } @@ -77,6 +83,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC logger: logger, deduper: deduper, localStartTimeCallback: stCallback, + storeST: storeST, } s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, enableTypeAndUnitLabels) return s @@ -139,6 +146,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { labelsToEqualityMatchers(rrConf.RequiredMatchers), rrConf.ReadRecent, s.localStartTimeCallback, + s.storeST, )) } s.queryables = queryables @@ -187,7 +195,7 @@ func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { } queriers = append(queriers, q) } - return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil + return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, s.storeST)), nil } // Appender implements storage.Storage. diff --git a/storage/series.go b/storage/series.go index bf6df7db3e..69e35bb3ce 100644 --- a/storage/series.go +++ b/storage/series.go @@ -289,11 +289,12 @@ func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series { type seriesSetToChunkSet struct { SeriesSet + storeST bool } // NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples. -func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet { - return &seriesSetToChunkSet{SeriesSet: chk} +func NewSeriesSetToChunkSet(chk SeriesSet, storeST bool) ChunkSeriesSet { + return &seriesSetToChunkSet{SeriesSet: chk, storeST: storeST} } func (c *seriesSetToChunkSet) Next() bool { @@ -304,7 +305,7 @@ func (c *seriesSetToChunkSet) Next() bool { } func (c *seriesSetToChunkSet) At() ChunkSeries { - return NewSeriesToChunkEncoder(c.SeriesSet.At()) + return NewSeriesToChunkEncoder(c.SeriesSet.At(), c.storeST) } func (c *seriesSetToChunkSet) Err() error { @@ -313,13 +314,14 @@ func (c *seriesSetToChunkSet) Err() error { type seriesToChunkEncoder struct { Series + storeST bool } const seriesToChunkEncoderSplit = 120 // NewSeriesToChunkEncoder encodes samples to chunks with 120 samples limit. -func NewSeriesToChunkEncoder(series Series) ChunkSeries { - return &seriesToChunkEncoder{series} +func NewSeriesToChunkEncoder(series Series, storeST bool) ChunkSeries { + return &seriesToChunkEncoder{Series: series, storeST: storeST} } func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { @@ -342,10 +344,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { + st := seriesIter.AtST() if typ != lastType || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. chks = appendChunk(chks, mint, maxt, chk) - chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) + chk, err = typ.NewChunk(s.storeST) if err != nil { return errChunksIterator{err: err} } @@ -360,19 +363,17 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { lastType = typ var ( - st, t int64 - v float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) switch typ { case chunkenc.ValFloat: t, v = seriesIter.At() - st = seriesIter.AtST() app.Append(st, t, v) case chunkenc.ValHistogram: t, h = seriesIter.AtHistogram(nil) - st = seriesIter.AtST() newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false) if err != nil { return errChunksIterator{err: err} @@ -388,7 +389,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { } case chunkenc.ValFloatHistogram: t, fh = seriesIter.AtFloatHistogram(nil) - st = seriesIter.AtST() newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false) if err != nil { return errChunksIterator{err: err} diff --git a/storage/series_test.go b/storage/series_test.go index b33d6cb1b3..06224b8aa4 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -601,7 +601,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { } } series := NewListSeries(lbs, copiedSamples) - encoder := NewSeriesToChunkEncoder(series) + encoder := NewSeriesToChunkEncoder(series, false) require.Equal(t, lbs, encoder.Labels()) chks, err := ExpandChunks(encoder.Iterator(nil)) diff --git a/tsdb/block.go b/tsdb/block.go index 118dd672ef..b3455baef4 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -425,6 +425,13 @@ func (pb *Block) Size() int64 { return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta } +// SetSTStorageEnabled sets whether ST storage is enabled on the block's chunk reader. +func (pb *Block) SetSTStorageEnabled(enabled bool) { + if cr, ok := pb.chunkr.(*chunks.Reader); ok { + cr.SetSTStorageEnabled(enabled) + } +} + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 6fb8de2a77..44133b89c4 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -76,6 +76,8 @@ type Chunk interface { Bytes() []byte // Encoding returns the encoding type of the chunk. + // If the chunk is capable of storing ST (start timestamps), it should + // return the appropriate encoding type (e.g., EncXOROptST). Encoding() Encoding // Appender returns an appender to append samples to the chunk. @@ -189,9 +191,12 @@ func (v ValueType) String() string { } } -func (v ValueType) ChunkEncoding() Encoding { +func (v ValueType) ChunkEncoding(storeST bool) Encoding { switch v { case ValFloat: + if storeST { + return EncXOROptST + } return EncXOR case ValHistogram: return EncHistogram @@ -202,17 +207,8 @@ func (v ValueType) ChunkEncoding() Encoding { } } -func (v ValueType) NewChunk() (Chunk, error) { - switch v { - case ValFloat: - return NewXORChunk(), nil - case ValHistogram: - return NewHistogramChunk(), nil - case ValFloatHistogram: - return NewFloatHistogramChunk(), nil - default: - return nil, fmt.Errorf("value type %v unsupported", v) - } +func (v ValueType) NewChunk(storeST bool) (Chunk, error) { + return NewEmptyChunk(v.ChunkEncoding(storeST)) } // MockSeriesIterator returns an iterator for a mock series with custom @@ -399,6 +395,7 @@ func FromData(e Encoding, d []byte) (Chunk, error) { } // NewEmptyChunk returns an empty chunk for the given encoding. +// TODO(krajorama): support storeST for histogram and float histogram chunks when they are implemented. func NewEmptyChunk(e Encoding) (Chunk, error) { switch e { case EncXOR: diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 9b4e011562..9cc06a32bd 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -135,7 +135,9 @@ type Meta struct { } // ChunkFromSamples requires all samples to have the same type. -// TODO(krajorama): test with ST when chunk formats support it. +// It is not efficient and meant for testing purposes only. +// It scans the samples to determine whether any sample has ST set and +// creates a chunk accordingly. func ChunkFromSamples(s []Sample) (Meta, error) { return ChunkFromSamplesGeneric(SampleSlice(s)) } @@ -154,7 +156,17 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { } sampleType := s.Get(0).Type() - c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding()) + + hasST := false + for i := range s.Len() { + if s.Get(i).ST() != 0 { + hasST = true + break + } + } + + // Request storing ST in the chunk if available. + c, err := sampleType.NewChunk(hasST) if err != nil { return Meta{}, err } @@ -627,6 +639,8 @@ type Reader struct { cs []io.Closer // Closers for resources behind the byte slices. size int64 // The total size of bytes in the reader. pool chunkenc.Pool + + stStorageEnabled bool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { @@ -695,6 +709,16 @@ func (s *Reader) Size() int64 { return s.size } +// SetSTStorageEnabled sets whether ST storage is enabled for this reader. +func (s *Reader) SetSTStorageEnabled(enabled bool) { + s.stStorageEnabled = enabled +} + +// STStorageEnabled returns whether ST storage is enabled for this reader. +func (s *Reader) STStorageEnabled() bool { + return s.stStorageEnabled +} + // ChunkOrIterable returns a chunk from a given reference. func (s *Reader) ChunkOrIterable(meta Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack() diff --git a/tsdb/chunks/chunks_test.go b/tsdb/chunks/chunks_test.go index f40f996fde..e3b0454e37 100644 --- a/tsdb/chunks/chunks_test.go +++ b/tsdb/chunks/chunks_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" ) @@ -58,3 +59,35 @@ func TestWriterWithDefaultSegmentSize(t *testing.T) { require.NoError(t, err) require.Len(t, d, 1, "expected only one segment to be created to hold both chunks") } + +func TestChunkFromSamplesWithST(t *testing.T) { + // Create samples with explicit ST (source timestamp) values + samples := []Sample{ + sample{t: 10, f: 11, st: 5}, + sample{t: 20, f: 12, st: 15}, + sample{t: 30, f: 13, st: 25}, + } + + chk, err := ChunkFromSamples(samples) + require.NoError(t, err) + require.NotNil(t, chk.Chunk) + + // Verify MinTime and MaxTime + require.Equal(t, int64(10), chk.MinTime) + require.Equal(t, int64(30), chk.MaxTime) + + // Iterate over the chunk and verify ST values are preserved + it := chk.Chunk.Iterator(nil) + idx := 0 + for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { + require.Equal(t, chunkenc.ValFloat, vt) + ts, v := it.At() + st := it.AtST() + require.Equal(t, samples[idx].ST(), st, "ST mismatch at index %d", idx) + require.Equal(t, samples[idx].T(), ts, "T mismatch at index %d", idx) + require.Equal(t, samples[idx].F(), v, "F mismatch at index %d", idx) + idx++ + } + require.NoError(t, it.Err()) + require.Equal(t, len(samples), idx, "expected all samples to be iterated") +} diff --git a/tsdb/compact.go b/tsdb/compact.go index 7091d34d50..a3da75a2d9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -178,6 +178,9 @@ type LeveledCompactorOptions struct { // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. EnableOverlappingCompaction bool + // EnableSTStorage determines whether compaction should re-encode chunks with start timestamps. + EnableSTStorage bool + // Metrics is set of metrics for Compactor. By default, NewCompactorMetrics would be called to initialize metrics unless it is provided. Metrics *CompactorMetrics // UseUncachedIO allows bypassing the page cache when appropriate. @@ -211,7 +214,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer } mergeFunc := opts.MergeFunc if mergeFunc == nil { - mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) + mergeFunc = storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, opts.EnableSTStorage) } maxBlockChunkSegmentSize := opts.MaxBlockChunkSegmentSize if maxBlockChunkSegmentSize == 0 { diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index afe15a5f31..67c1f6d19e 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1422,7 +1422,7 @@ func TestCancelCompactions(t *testing.T) { // Make sure that no blocks were marked as compaction failed. // This checks that the `context.Canceled` error is properly checked at all levels: // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger()) + readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger(), false) require.NoError(t, err) blocks, err := readOnlyDB.Blocks() require.NoError(t, err) diff --git a/tsdb/db.go b/tsdb/db.go index 1d73628bfd..b2a780c3e5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -494,15 +494,17 @@ var ErrClosed = errors.New("db already closed") // Current implementation doesn't support concurrency so // all API calls should happen in the same go routine. type DBReadOnly struct { - logger *slog.Logger - dir string - sandboxDir string - closers []io.Closer - closed chan struct{} + logger *slog.Logger + dir string + sandboxDir string + closers []io.Closer + closed chan struct{} + stStorageEnabled bool } // OpenDBReadOnly opens DB in the given directory for read only operations. -func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, error) { +// stStorageEnabled should be true when reading blocks that may contain ST data. +func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger, stStorageEnabled bool) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, fmt.Errorf("opening the db dir: %w", err) } @@ -520,10 +522,11 @@ func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, er } return &DBReadOnly{ - logger: l, - dir: dir, - sandboxDir: sandboxDir, - closed: make(chan struct{}), + logger: l, + dir: dir, + sandboxDir: sandboxDir, + closed: make(chan struct{}), + stStorageEnabled: stStorageEnabled, }, nil } @@ -590,7 +593,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { return nil } -func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQueryable, error) { +func (db *DBReadOnly) loadDataAsQueryable(maxt int64, enableSTStorage bool) (storage.SampleAndChunkQueryable, error) { select { case <-db.closed: return nil, ErrClosed @@ -662,9 +665,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue } db.closers = append(db.closers, head) + dbOpts := DefaultOptions() + dbOpts.EnableSTStorage = enableSTStorage return &DB{ dir: db.dir, logger: db.logger, + opts: dbOpts, blocks: blocks, head: head, blockQuerierFunc: NewBlockQuerier, @@ -675,7 +681,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue // Querier loads the blocks and wal and returns a new querier over the data partition for the given time range. // Current implementation doesn't support multiple Queriers. func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) { - q, err := db.loadDataAsQueryable(maxt) + q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled) if err != nil { return nil, err } @@ -685,7 +691,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) { // ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. // Current implementation doesn't support multiple ChunkQueriers. func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { - q, err := db.loadDataAsQueryable(maxt) + q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled) if err != nil { return nil, err } @@ -699,7 +705,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory, db.stStorageEnabled) if err != nil { return nil, err } @@ -813,6 +819,7 @@ func (db *DBReadOnly) Block(blockID string, postingsDecoderFactory PostingsDecod if err != nil { return nil, err } + block.SetSTStorageEnabled(db.stStorageEnabled) db.closers = append(db.closers, block) return block, nil @@ -983,6 +990,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn PD: opts.PostingsDecoderFactory, UseUncachedIO: opts.UseUncachedIO, BlockExcludeFilter: opts.BlockCompactionExcludeFunc, + EnableSTStorage: opts.EnableSTStorage, }) } if err != nil { @@ -1044,6 +1052,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) headOpts.EnableSharding = opts.EnableSharding headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample + headOpts.EnableSTStorage.Store(opts.EnableSTStorage) headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency @@ -1786,7 +1795,7 @@ func (db *DB) reloadBlocks() (err error) { }() db.mtx.RLock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory, db.opts.EnableSTStorage) db.mtx.RUnlock() if err != nil { return err @@ -1886,7 +1895,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, stStorageEnabled bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, fmt.Errorf("find blocks: %w", err) @@ -1908,6 +1917,7 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc. corrupted[meta.ULID] = err continue } + block.SetSTStorageEnabled(stStorageEnabled) } blocks = append(blocks, block) } @@ -2433,7 +2443,7 @@ func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { if err != nil { return nil, err } - return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil + return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, db.opts.EnableSTStorage)), nil } func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { diff --git a/tsdb/db_append_v2_test.go b/tsdb/db_append_v2_test.go index 15201d3dc7..3bc657e162 100644 --- a/tsdb/db_append_v2_test.go +++ b/tsdb/db_append_v2_test.go @@ -1507,7 +1507,7 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) { }) for _, enableStStorage := range []bool{false, true} { - t.Run("wal-only,stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) { + t.Run("wal-only-st-"+strconv.FormatBool(enableStStorage), func(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) @@ -1547,7 +1547,7 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) { require.True(t, db.head.initialized()) }) for _, enableStStorage := range []bool{false, true} { - t.Run("existing-block-and-wal,stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) { + t.Run("existing-block-and-wal-st-"+strconv.FormatBool(enableStStorage), func(t *testing.T) { dir := t.TempDir() createBlock(t, dir, genSeries(1, 1, 1000, 6000)) @@ -1981,7 +1981,7 @@ func TestDBReadOnly_AppendV2(t *testing.T) { } // Open a read only db and ensure that the API returns the same result as the normal DB. - dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger) + dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger, false) require.NoError(t, err) defer func() { require.NoError(t, dbReadOnly.Close()) }() @@ -2058,7 +2058,7 @@ func TestDBReadOnly_FlushWAL_AppendV2(t *testing.T) { } // Flush WAL. - db, err := OpenDBReadOnly(dbDir, "", logger) + db, err := OpenDBReadOnly(dbDir, "", logger, false) require.NoError(t, err) flush := t.TempDir() @@ -2066,7 +2066,7 @@ func TestDBReadOnly_FlushWAL_AppendV2(t *testing.T) { require.NoError(t, db.Close()) // Reopen the DB from the flushed WAL block. - db, err = OpenDBReadOnly(flush, "", logger) + db, err = OpenDBReadOnly(flush, "", logger, false) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() blocks, err := db.Blocks() @@ -2114,7 +2114,7 @@ func TestDBReadOnly_Querier_NoAlteration_AppendV2(t *testing.T) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { dBDirHash := dirHash(dir) // Bootstrap a RO db from the same dir and set up a querier. - dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil) + dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil, false) require.NoError(t, err) require.Equal(t, chunksCount, countChunks(dir)) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) @@ -7512,6 +7512,70 @@ func TestAbortBlockCompactions_AppendV2(t *testing.T) { require.Equal(t, 4, compactions, "expected 4 compactions to be completed") } +// TestCompactHeadWithSTStorage_AppendV2 ensures that when EnableSTStorage is true, +// compacted blocks contain chunks with EncXOROptST encoding for float samples. +func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) { + t.Parallel() + + opts := &Options{ + RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond), + NoLockfile: true, + MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), + MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), + WALCompression: compression.Snappy, + EnableSTStorage: true, + } + db := newTestDB(t, withOpts(opts)) + ctx := context.Background() + app := db.AppenderV2(ctx) + + maxt := 100 + for i := range maxt { + // AppendV2 signature: (ref, labels, st, t, v, h, fh, opts) + // st=0 (start timestamp), t=i (sample timestamp) + // TODO(krajorama): verify with non zero st once the API supports it. + _, err := app.Append(0, labels.FromStrings("a", "b"), 0, int64(i), float64(i), nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Compact the Head to create a new block. + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1))) + // Check that we have exactly one block. + require.Len(t, db.Blocks(), 1) + b := db.Blocks()[0] + + // Open chunk reader and index reader. + chunkr, err := b.Chunks() + require.NoError(t, err) + defer chunkr.Close() + + indexr, err := b.Index() + require.NoError(t, err) + defer indexr.Close() + + // Get postings for the series. + p, err := indexr.Postings(ctx, "a", "b") + require.NoError(t, err) + + chunkCount := 0 + for p.Next() { + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, indexr.Series(p.At(), &builder, &chks)) + + for _, chk := range chks { + c, _, err := chunkr.ChunkOrIterable(chk) + require.NoError(t, err) + require.Equal(t, chunkenc.EncXOROptST, c.Encoding(), + "expected EncXOROptST encoding when EnableSTStorage=true, got %s", c.Encoding()) + chunkCount++ + } + } + require.NoError(t, p.Err()) + require.Positive(t, chunkCount, "expected at least one chunk") +} + func TestNewCompactorFunc_AppendV2(t *testing.T) { opts := DefaultOptions() block1 := ulid.MustNew(1, nil) @@ -7543,3 +7607,114 @@ func TestNewCompactorFunc_AppendV2(t *testing.T) { require.Len(t, ulids, 1) require.Equal(t, block2, ulids[0]) } + +// TestDBAppenderV2_STStorage_OutOfOrder verifies that ST storage works correctly +// when samples are appended out of order and can be queried using ChunkQuerier. +func TestDBAppenderV2_STStorage_OutOfOrder(t *testing.T) { + testHistogram := tsdbutil.GenerateTestHistogram(1) + testHistogram.CounterResetHint = histogram.NotCounterReset + + testCases := []struct { + name string + appendSamples []chunks.Sample // Samples in append order (out of order) + expectedSamples []chunks.Sample // Expected samples in time order after query + }{ + { + name: "Float samples out of order", + appendSamples: []chunks.Sample{ + newSample(20, 200, 2.0, nil, nil), // Append second sample first + newSample(10, 100, 1.0, nil, nil), // Append first sample second (OOO) + newSample(30, 300, 3.0, nil, nil), // Append third sample last + newSample(25, 250, 2.5, nil, nil), // Append middle sample (OOO) + }, + expectedSamples: []chunks.Sample{ + newSample(10, 100, 1.0, nil, nil), + newSample(20, 200, 2.0, nil, nil), + newSample(25, 250, 2.5, nil, nil), + newSample(30, 300, 3.0, nil, nil), + }, + }, + { + name: "Histogram samples out of order", + appendSamples: []chunks.Sample{ + newSample(30, 300, 0, testHistogram, nil), // Append third sample first + newSample(10, 100, 0, testHistogram, nil), // Append first sample second (OOO) + newSample(20, 200, 0, testHistogram, nil), // Append second sample last (OOO) + }, + // Histograms don't support ST storage yet, should return 0 for ST + expectedSamples: []chunks.Sample{ + newSample(0, 100, 0, testHistogram, nil), + newSample(0, 200, 0, testHistogram, nil), + newSample(0, 300, 0, testHistogram, nil), + }, + }, + { + name: "Mixed float samples with same ST", + appendSamples: []chunks.Sample{ + newSample(10, 200, 2.0, nil, nil), + newSample(10, 100, 1.0, nil, nil), // OOO with same ST + newSample(10, 300, 3.0, nil, nil), + }, + expectedSamples: []chunks.Sample{ + newSample(10, 100, 1.0, nil, nil), + newSample(10, 200, 2.0, nil, nil), + newSample(10, 300, 3.0, nil, nil), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() + opts.EnableSTStorage = true + db := newTestDB(t, withOpts(opts)) + db.DisableCompactions() + + lbls := labels.FromStrings("foo", "bar") + + // Append samples in the specified (out of order) sequence + for _, s := range tc.appendSamples { + app := db.AppenderV2(context.Background()) + _, err := app.Append(0, lbls, s.ST(), s.T(), s.F(), s.H(), s.FH(), storage.AOptions{}) + require.NoError(t, err, "Appending OOO sample with ST should succeed") + require.NoError(t, app.Commit(), "Committing OOO sample with ST should succeed") + } + + // Query using ChunkQuerier to verify ST values + querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + defer querier.Close() + + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next(), "Should have series") + series := ss.At() + require.NoError(t, ss.Err()) + require.False(t, ss.Next(), "Should have only one series") + + // Iterate through chunks and collect samples using storage.ExpandSamples + chunkIt := series.Iterator(nil) + var actualSamples []chunks.Sample + + for chunkIt.Next() { + chk := chunkIt.At() + it := chk.Chunk.Iterator(nil) + samples, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) + actualSamples = append(actualSamples, samples...) + } + require.NoError(t, chunkIt.Err()) + + // Verify samples are in time order with correct values + // Use requireEqualSamplesIgnoreCounterResets to ignore histogram counter reset hints + requireEqualSamples(t, lbls.String(), tc.expectedSamples, actualSamples, requireEqualSamplesIgnoreCounterResets) + + // Additionally verify ST values match expectations + require.Len(t, actualSamples, len(tc.expectedSamples)) + for i, expected := range tc.expectedSamples { + actual := actualSamples[i] + require.Equal(t, expected.ST(), actual.ST(), "Sample %d: ST should match", i) + } + }) + } +} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 13c37eb219..8e03410bd0 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2553,7 +2553,7 @@ func TestDBReadOnly(t *testing.T) { } // Open a read only db and ensure that the API returns the same result as the normal DB. - dbReadOnly, err := OpenDBReadOnly(dbDir, "", nil) + dbReadOnly, err := OpenDBReadOnly(dbDir, "", nil, false) require.NoError(t, err) defer func() { require.NoError(t, dbReadOnly.Close()) }() @@ -2609,7 +2609,7 @@ func TestDBReadOnly(t *testing.T) { func TestDBReadOnlyClosing(t *testing.T) { t.Parallel() sandboxDir := t.TempDir() - db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{})) + db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{}), false) require.NoError(t, err) // The sandboxDir was there. require.DirExists(t, db.sandboxDir) @@ -2648,7 +2648,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { } // Flush WAL. - db, err := OpenDBReadOnly(dbDir, "", nil) + db, err := OpenDBReadOnly(dbDir, "", nil, false) require.NoError(t, err) flush := t.TempDir() @@ -2656,7 +2656,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, db.Close()) // Reopen the DB from the flushed WAL block. - db, err = OpenDBReadOnly(flush, "", nil) + db, err = OpenDBReadOnly(flush, "", nil, false) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() blocks, err := db.Blocks() @@ -2704,7 +2704,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { dBDirHash := dirHash(dir) // Bootstrap a RO db from the same dir and set up a querier. - dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil) + dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil, false) require.NoError(t, err) require.Equal(t, chunksCount, countChunks(dir)) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) @@ -9626,3 +9626,66 @@ func TestStaleSeriesCompactionWithZeroSeries(t *testing.T) { // Should still have no blocks since there was nothing to compact. require.Empty(t, db.Blocks()) } + +// TestCompactHeadWithSTStorage ensures that when EnableSTStorage is true, +// compacted blocks contain chunks with EncXOR encoding for float samples +// when using the original Appender (which does not support start timestamps). +func TestCompactHeadWithSTStorage(t *testing.T) { + t.Parallel() + + opts := &Options{ + RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond), + NoLockfile: true, + MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), + MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), + WALCompression: compression.Snappy, + EnableSTStorage: true, + } + db := newTestDB(t, withOpts(opts)) + ctx := context.Background() + app := db.Appender(ctx) + + maxt := 100 + for i := range maxt { + // Original Appender signature: (ref, labels, t, v) + _, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Compact the Head to create a new block. + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1))) + // Check that we have exactly one block. + require.Len(t, db.Blocks(), 1) + b := db.Blocks()[0] + + // Open chunk reader and index reader. + chunkr, err := b.Chunks() + require.NoError(t, err) + defer chunkr.Close() + + indexr, err := b.Index() + require.NoError(t, err) + defer indexr.Close() + + // Get postings for the series. + p, err := indexr.Postings(ctx, "a", "b") + require.NoError(t, err) + + chunkCount := 0 + for p.Next() { + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, indexr.Series(p.At(), &builder, &chks)) + + for _, chk := range chks { + c, _, err := chunkr.ChunkOrIterable(chk) + require.NoError(t, err) + require.Equal(t, chunkenc.EncXOROptST, c.Encoding(), + "expected EncXOR encoding when using original Appender, got %s", c.Encoding()) + chunkCount++ + } + } + require.NoError(t, p.Err()) + require.Positive(t, chunkCount, "expected at least one chunk") +} diff --git a/tsdb/head.go b/tsdb/head.go index 19c2538b12..917bd666d3 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -160,6 +160,11 @@ type HeadOptions struct { OutOfOrderTimeWindow atomic.Int64 OutOfOrderCapMax atomic.Int64 + // EnableSTStorage determines whether databases (WAL/WBL, tsdb, + // agent) should set a Start Time value per sample. + // Represents 'st-storage' feature flag. + EnableSTStorage atomic.Bool + ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string @@ -200,11 +205,6 @@ type HeadOptions struct { // NOTE(bwplotka): This feature might be deprecated and removed once PROM-60 // is implemented. EnableMetadataWALRecords bool - - // EnableSTStorage determines whether agent DB should write a Start Timestamp (ST) - // per sample to WAL. - // TODO(bwplotka): Implement this option as per PROM-60, currently it's noop. - EnableSTStorage bool } const ( @@ -1386,7 +1386,7 @@ func (h *Head) truncateWAL(mint int64) error { } h.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint, h.opts.EnableSTStorage); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint, h.opts.EnableSTStorage.Load()); err != nil { h.metrics.checkpointCreationFail.Inc() var cerr *chunks.CorruptionErr if errors.As(err, &cerr) { @@ -1680,7 +1680,7 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match } if h.wal != nil { - enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage} + enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage.Load()} if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { return err } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 846ad476e3..41bcdf1c95 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -185,6 +185,7 @@ func (h *Head) appender() *headAppender { typesInBatch: h.getTypeMap(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, + storeST: h.opts.EnableSTStorage.Load(), }, } } @@ -412,6 +413,7 @@ type headAppenderBase struct { appendID, cleanupAppendIDsBelow uint64 closed bool + storeST bool } type headAppender struct { headAppenderBase @@ -1387,7 +1389,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + ok, chunkCreated, mmapRefs = series.insert(a.storeST, s.ST, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1431,7 +1433,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte default: newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) - ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + ok, chunkCreated = series.append(a.storeST, s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1492,7 +1494,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama,ywwg): Pass ST when available in WAL. + ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1540,7 +1543,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) } - ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) + // TODO(krajorama,ywwg): pass ST when available in WAL. + ok, chunkCreated = series.appendHistogram(a.storeST, 0, s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1601,7 +1605,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama,ywwg): Pass ST when available in WAL. + ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1649,7 +1654,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) } - ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) + // TODO(krajorama,ywwg): pass ST when available in WAL. + ok, chunkCreated = series.appendFloatHistogram(a.storeST, 0, s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1799,18 +1805,18 @@ func (a *headAppenderBase) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(storeST bool, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger) + c, mmapRefs = s.cutNewOOOHeadChunk(storeST, t, chunkDiskMapper, logger) chunkCreated = true } - ok := c.chunk.Insert(t, v, h, fh) + ok := c.chunk.Insert(st, t, v, h, fh) if ok { if chunkCreated || t < c.minTime { c.minTime = t @@ -1833,13 +1839,12 @@ type chunkOpts struct { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // Series lock must be held when calling. -func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o) +func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } - // TODO(krajorama): pass ST. - s.app.Append(0, t, v) + s.app.Append(st, t, v) c.maxTime = t @@ -1859,14 +1864,14 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.HistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1881,8 +1886,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed + newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed s.lastHistogramValue = h s.lastFloatHistogramValue = nil @@ -1917,14 +1921,14 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1939,8 +1943,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed. + newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed. s.lastHistogramValue = nil s.lastFloatHistogramValue = fh @@ -2164,8 +2167,8 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. // The caller must ensure that s is locked and s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { - ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger) +func (s *memSeries) cutNewOOOHeadChunk(storeST bool, mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { + ref := s.mmapCurrentOOOHeadChunk(storeST, chunkDiskMapper, logger) s.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), @@ -2177,12 +2180,12 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk } // s must be locked when calling. -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef { +func (s *memSeries) mmapCurrentOOOHeadChunk(storeST bool, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { // OOO is not enabled or there is no head chunk, so nothing to m-map here. return nil } - chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, math.MinInt64, math.MaxInt64) if err != nil { handleChunkWriteError(err) return nil diff --git a/tsdb/head_append_v2.go b/tsdb/head_append_v2.go index 87b62df536..769b55c262 100644 --- a/tsdb/head_append_v2.go +++ b/tsdb/head_append_v2.go @@ -95,6 +95,7 @@ func (h *Head) appenderV2() *headAppenderV2 { typesInBatch: h.getTypeMap(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, + storeST: h.opts.EnableSTStorage.Load(), }, } } @@ -141,7 +142,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i } // TODO(bwplotka): Handle ST natively (as per PROM-60). - if a.head.opts.EnableSTAsZeroSample && st != 0 { + if st != 0 && a.head.opts.EnableSTAsZeroSample { a.bestEffortAppendSTZeroSample(s, ls, st, t, h, fh) } @@ -177,7 +178,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i // we do not need to check for the difference between "unknown // series" and "known series with stNone". } - appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder) + appErr = a.appendFloat(s, st, t, v, opts.RejectOutOfOrder) } // Handle append error, if any. if appErr != nil { @@ -218,7 +219,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i return storage.SeriesRef(s.ref), partialErr } -func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error { +func (a *headAppenderV2) appendFloat(s *memSeries, st, t int64, v float64, fastRejectOOO bool) error { s.Lock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -239,7 +240,7 @@ func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejec } b := a.getCurrentBatch(stFloat, s.ref) - b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v}) + b.floats = append(b.floats, record.RefSample{Ref: s.ref, ST: st, T: t, V: v}) b.floatSeries = append(b.floatSeries, s) return nil } @@ -366,7 +367,7 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La } err = a.appendHistogram(s, st, zeroHistogram, true) default: - err = a.appendFloat(s, st, 0, true) + err = a.appendFloat(s, 0, st, 0, true) } if err != nil { diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index ba756f801f..623273e6ba 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -2925,13 +2925,15 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot_AppenderV2(t *testing.T) { // TestWBLReplay checks the replay at a low level. func TestWBLReplay_AppenderV2(t *testing.T) { for name, scenario := range sampleTypeScenarios { - t.Run(name, func(t *testing.T) { - testWBLReplayAppenderV2(t, scenario) - }) + for _, enableSTstorage := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/st-storage=%v", name, enableSTstorage), func(t *testing.T) { + testWBLReplayAppenderV2(t, scenario, enableSTstorage) + }) + } } } -func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { +func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableSTstorage bool) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) @@ -2942,6 +2944,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { opts.ChunkRange = 1000 opts.ChunkDirRoot = dir opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) + opts.EnableSTStorage.Store(enableSTstorage) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -2993,7 +2996,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { require.False(t, ok) require.NotNil(t, ms) - chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(h.opts.EnableSTStorage.Load(), math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chks, 1) @@ -4754,3 +4757,138 @@ func TestHeadAppenderV2_Append_HistogramStalenessConversionMetrics(t *testing.T) }) } } + +// TestHeadAppender_STStorage verifies that when EnableSTStorage is true, +// start timestamps are properly stored in chunks and returned by queries. +// This test uses AppenderV2 which has native ST support. +func TestHeadAppenderV2_STStorage(t *testing.T) { + testHistogram := tsdbutil.GenerateTestHistogram(1) + testHistogram.CounterResetHint = histogram.NotCounterReset + + type sampleData struct { + st int64 + ts int64 + fSample float64 + h *histogram.Histogram + } + + testCases := []struct { + name string + samples []sampleData + expectedSTs []int64 // Expected ST values + isHistogram bool + }{ + { + name: "Float samples with ST", + samples: []sampleData{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + {st: 30, ts: 300, fSample: 3.0}, + }, + expectedSTs: []int64{10, 20, 30}, + isHistogram: false, + }, + { + name: "Float samples with varying ST", + samples: []sampleData{ + {st: 5, ts: 100, fSample: 1.0}, + {st: 5, ts: 200, fSample: 2.0}, // Same ST + {st: 150, ts: 300, fSample: 3.0}, // Different ST + }, + expectedSTs: []int64{5, 5, 150}, + isHistogram: false, + }, + { + name: "Histogram samples", + samples: []sampleData{ + {st: 10, ts: 100, h: testHistogram}, + {st: 20, ts: 200, h: testHistogram}, + {st: 30, ts: 300, h: testHistogram}, + }, + // Histograms don't support ST storage yet, should return 0 + expectedSTs: []int64{0, 0, 0}, + isHistogram: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(true) + h, _ := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + + // Use AppenderV2 which has native ST support + a := h.AppenderV2(context.Background()) + for _, s := range tc.samples { + _, err := a.Append(0, lbls, s.st, s.ts, s.fSample, s.h, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + // Verify ST values are stored in chunks + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + + // Read chunks and verify ST values + var actualSTs []int64 + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + it := chk.Iterator(nil) + for it.Next() != chunkenc.ValNone { + st := it.AtST() + actualSTs = append(actualSTs, st) + } + require.NoError(t, it.Err()) + } + + // Verify expected ST values + if tc.isHistogram { + require.Equal(t, tc.expectedSTs, actualSTs, "Histogram samples should return 0 for ST") + } else { + require.Equal(t, tc.expectedSTs, actualSTs, "Float samples should have ST stored") + } + + // Also verify via querier + q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + defer q.Close() + + ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + series := ss.At() + require.NoError(t, ss.Err()) + + seriesIt := series.Iterator(nil) + var queriedSTs []int64 + for seriesIt.Next() != chunkenc.ValNone { + st := seriesIt.AtST() + queriedSTs = append(queriedSTs, st) + } + require.NoError(t, seriesIt.Err()) + + // Verify querier returns same ST values + require.Equal(t, tc.expectedSTs, queriedSTs, "Querier should return same ST values as chunk iterator") + }) + } +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f0a1331fbb..934932ee5b 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -444,6 +444,17 @@ type ChunkReaderWithCopy interface { ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) } +// ChunkReaderWithSTStorage is an optional interface that ChunkReaders can +// implement to indicate whether ST (start time) storage is enabled. +type ChunkReaderWithSTStorage interface { + STStorageEnabled() bool +} + +// STStorageEnabled returns whether ST storage is enabled in the Head. +func (h *headChunkReader) STStorageEnabled() bool { + return h.head.opts.EnableSTStorage.Load() +} + // ChunkOrIterableWithCopy returns the chunk for the reference number. // If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk. func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index cf55973a01..46d129510e 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -33,7 +33,7 @@ func TestMemSeries_chunk(t *testing.T) { appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) { for i := start; i < end; i += chunkStep { - ok, _ := s.append(i, float64(i), 0, chunkOpts{ + ok, _ := s.append(false, 0, i, float64(i), 0, chunkOpts{ chunkDiskMapper: cdm, chunkRange: chunkRange, samplesPerChunk: DefaultSamplesPerChunk, diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 81cb236801..85ec9a09a9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -349,7 +349,7 @@ func BenchmarkLoadWLs(b *testing.B) { for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false) - s.append(c.mmappedChunkT, 42, 0, cOpts) + s.append(false, 0, c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with // the sample at c.mmappedChunkT is mmapped. @@ -1492,7 +1492,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, cOpts) + ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } s.mmapChunks(chunkDiskMapper) @@ -1642,7 +1642,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { if tc.mmappedChunks > 0 { headStart = (tc.mmappedChunks + 1) * chunkRange for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep { - ok, _ := series.append(int64(i), float64(i), 0, cOpts) + ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } series.mmapChunks(chunkDiskMapper) @@ -1652,7 +1652,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { series.headChunks = nil } else { for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep { - ok, _ := series.append(int64(i), float64(i), 0, cOpts) + ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed: %d", i) } } @@ -2183,7 +2183,41 @@ func TestComputeChunkEndTime(t *testing.T) { } } +// TestMemSeries_append tests float appending with various storeST/st combinations. func TestMemSeries_append(t *testing.T) { + scenarios := map[string]struct { + storeST bool + stFunc func(ts int64) int64 // Function to compute st from ts + }{ + "storeST=false st=0": { + storeST: false, + stFunc: func(_ int64) int64 { return 0 }, + }, + "storeST=true st=0": { + storeST: true, + stFunc: func(_ int64) int64 { return 0 }, + }, + "storeST=true st=ts": { + storeST: true, + stFunc: func(ts int64) int64 { return ts }, + }, + "storeST=true st=ts-100": { + storeST: true, + stFunc: func(ts int64) int64 { return ts - 100 }, + }, + "storeST=false st=ts (st ignored)": { + storeST: false, + stFunc: func(ts int64) int64 { return ts }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testMemSeriesAppend(t, scenario.storeST, scenario.stFunc) + }) + } +} + +func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) @@ -2202,20 +2236,20 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, cOpts) + ok, chunkCreated := s.append(storeST, stFunc(998), 998, 1, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, cOpts) + ok, chunkCreated = s.append(storeST, stFunc(999), 999, 2, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") s.mmapChunks(chunkDiskMapper) - ok, chunkCreated = s.append(1000, 3, 0, cOpts) + ok, chunkCreated = s.append(storeST, stFunc(1000), 1000, 3, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, cOpts) + ok, chunkCreated = s.append(storeST, stFunc(1001), 1001, 4, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -2229,7 +2263,8 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts) + ts := 1001 + int64(i) + ok, _ := s.append(storeST, stFunc(ts), ts, float64(i), 0, cOpts) require.True(t, ok, "append failed") } s.mmapChunks(chunkDiskMapper) @@ -2244,7 +2279,41 @@ func TestMemSeries_append(t *testing.T) { } } +// TestMemSeries_appendHistogram tests histogram appending with various storeST/st combinations. func TestMemSeries_appendHistogram(t *testing.T) { + scenarios := map[string]struct { + storeST bool + stFunc func(ts int64) int64 // Function to compute st from ts + }{ + "storeST=false st=0": { + storeST: false, + stFunc: func(_ int64) int64 { return 0 }, + }, + "storeST=true st=0": { + storeST: true, + stFunc: func(_ int64) int64 { return 0 }, + }, + "storeST=true st=ts": { + storeST: true, + stFunc: func(ts int64) int64 { return ts }, + }, + "storeST=true st=ts-100": { + storeST: true, + stFunc: func(ts int64) int64 { return ts - 100 }, + }, + "storeST=false st=ts (st ignored)": { + storeST: false, + stFunc: func(ts int64) int64 { return ts }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testMemSeriesAppendHistogram(t, scenario.storeST, scenario.stFunc) + }) + } +} + +func testMemSeriesAppendHistogram(t *testing.T, storeST bool, stFunc func(ts int64) int64) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) @@ -2270,19 +2339,19 @@ func TestMemSeries_appendHistogram(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts) + ok, chunkCreated := s.appendHistogram(storeST, stFunc(998), 998, histograms[0], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts) + ok, chunkCreated = s.appendHistogram(storeST, stFunc(999), 999, histograms[1], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts) + ok, chunkCreated = s.appendHistogram(storeST, stFunc(1000), 1000, histograms[2], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts) + ok, chunkCreated = s.appendHistogram(storeST, stFunc(1001), 1001, histograms[3], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -2293,7 +2362,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range") require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range") - ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts) + ok, chunkCreated = s.appendHistogram(storeST, stFunc(1002), 1002, histogramWithOneMoreBucket, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") @@ -2328,7 +2397,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { var nextTs int64 var totalAppendedSamples int for i := range samplesPerChunk / 4 { - ok, _ := s.append(nextTs, float64(i), 0, cOpts) + ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "slow sample %d was not appended", i) nextTs += slowRate totalAppendedSamples++ @@ -2337,12 +2406,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { // Suddenly, the rate increases and we receive a sample every millisecond. for i := range math.MaxUint16 { - ok, _ := s.append(nextTs, float64(i), 0, cOpts) + ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "quick sample %d was not appended", i) nextTs++ totalAppendedSamples++ } - ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts) + ok, chunkCreated := s.append(false, 0, DefaultBlockDuration, float64(0), 0, cOpts) require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") @@ -2371,18 +2440,18 @@ func TestGCChunkAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, cOpts) + ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -2427,18 +2496,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, cOpts) + ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -2775,10 +2844,10 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, created, "series was not created") for i := range 7 { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) + ok, chunkCreated := s.append(false, 0, int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) + ok, chunkCreated = s.append(false, 0, int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") h.chunkDiskMapper.CutNewFile() @@ -3118,7 +3187,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) - ok, _ := s.append(0, 0, 0, cOpts) + ok, _ := s.append(false, 0, 0, 0, 0, cOpts) require.True(t, ok, "Series append failed.") require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.") } @@ -3678,7 +3747,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) for i := range 7 { - ok, _ := s.append(int64(i), float64(i), 0, cOpts) + ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } @@ -5569,7 +5638,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { require.False(t, ok) require.NotNil(t, ms) - chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chks, 1) @@ -7267,3 +7336,137 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) { }) } } + +// TestHeadAppender_STStorage_Disabled verifies that when EnableSTStorage is false, +// start timestamps are NOT stored in chunks (AtST returns 0). +func TestHeadAppender_STStorage_Disabled(t *testing.T) { + type sampleData struct { + st int64 + ts int64 + fSample float64 + } + + samples := []sampleData{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + {st: 30, ts: 300, fSample: 3.0}, + } + + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(false) // Explicitly disable ST storage + h, _ := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + + // Use AppenderV2 to append samples with ST values + a := h.AppenderV2(context.Background()) + for _, s := range samples { + _, err := a.Append(0, lbls, s.st, s.ts, s.fSample, nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + // Verify ST values are NOT stored (should all be 0) + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + + // Read chunks and verify all ST values are 0 + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + it := chk.Iterator(nil) + for it.Next() != chunkenc.ValNone { + st := it.AtST() + require.Equal(t, int64(0), st, "ST should be 0 when EnableSTStorage is false") + } + require.NoError(t, it.Err()) + } +} + +// TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding +// is used based on EnableSTStorage setting. +func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) { + samples := []struct { + st int64 + ts int64 + fSample float64 + }{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + } + + for _, enableST := range []bool{false, true} { + t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(enableST) + h, _ := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + a := h.Appender(context.Background()) + for _, s := range samples { + _, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st) + require.NoError(t, err) + _, err = a.Append(0, lbls, s.ts, s.fSample) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + // Check chunk encoding + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + require.NotEmpty(t, chkMetas) + + // Verify encoding + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + encoding := chk.Encoding() + if enableST { + // Should use ST-capable encoding + require.Equal(t, chunkenc.EncXOROptST, encoding, + "Expected ST-capable encoding when EnableSTStorage is true") + } else { + // Should use regular XOR encoding + require.Equal(t, chunkenc.EncXOR, encoding, + "Expected regular XOR encoding when EnableSTStorage is false") + } + } + }) + } +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 6e9b80060c..174d428b58 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -115,8 +115,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() wg.Add(concurrency) + storeST := h.opts.EnableSTStorage.Load() for i := range concurrency { - processors[i].setup() + processors[i].setup(storeST) go func(wp *walSubsetProcessor) { missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) @@ -576,6 +577,7 @@ type walSubsetProcessor struct { input chan walSubsetProcessorInputItem output chan []record.RefSample histogramsOutput chan []histogramRecord + storeST bool } type walSubsetProcessorInputItem struct { @@ -586,10 +588,11 @@ type walSubsetProcessorInputItem struct { deletedSeriesRefs []chunks.HeadSeriesRef } -func (wp *walSubsetProcessor) setup() { +func (wp *walSubsetProcessor) setup(storeST bool) { wp.input = make(chan walSubsetProcessorInputItem, 300) wp.output = make(chan []record.RefSample, 300) wp.histogramsOutput = make(chan []histogramRecord, 300) + wp.storeST = storeST } func (wp *walSubsetProcessor) closeAndDrain() { @@ -666,7 +669,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp h.numStaleSeries.Dec() } - if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { + if _, chunkCreated := ms.append(wp.storeST, s.ST, s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() _ = ms.mmapChunks(h.chunkDiskMapper) @@ -703,14 +706,16 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum) } - _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + _, chunkCreated = ms.appendHistogram(wp.storeST, 0, s.t, s.h, 0, appendChunkOpts) } else { newlyStale = value.IsStaleNaN(s.fh.Sum) if ms.lastFloatHistogramValue != nil { newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum) } - _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + _, chunkCreated = ms.appendFloatHistogram(wp.storeST, 0, s.t, s.fh, 0, appendChunkOpts) } if newlyStale { h.numStaleSeries.Inc() @@ -779,8 +784,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() wg.Add(concurrency) + storeST := h.opts.EnableSTStorage.Load() for i := range concurrency { - processors[i].setup() + processors[i].setup(storeST) go func(wp *wblSubsetProcessor) { missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h) @@ -1025,6 +1031,7 @@ type wblSubsetProcessor struct { input chan wblSubsetProcessorInputItem output chan []record.RefSample histogramsOutput chan []histogramRecord + storeST bool } type wblSubsetProcessorInputItem struct { @@ -1033,10 +1040,11 @@ type wblSubsetProcessorInputItem struct { histogramSamples []histogramRecord } -func (wp *wblSubsetProcessor) setup() { +func (wp *wblSubsetProcessor) setup(storeST bool) { wp.output = make(chan []record.RefSample, 300) wp.histogramsOutput = make(chan []histogramRecord, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300) + wp.storeST = storeST } func (wp *wblSubsetProcessor) closeAndDrain() { @@ -1096,7 +1104,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR missingSeries[s.Ref] = struct{}{} continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) + ok, chunkCreated, _ := ms.insert(wp.storeST, s.ST, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -1124,9 +1132,11 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR var chunkCreated bool var ok bool if s.h != nil { - ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) } else { - ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) } if chunkCreated { h.metrics.chunksCreated.Inc() @@ -1400,7 +1410,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { // Assuming 100 bytes (overestimate) per exemplar, that's ~1MB. maxExemplarsPerRecord := 10000 batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord) - enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage} + enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage.Load()} flushExemplars := func() error { if len(batch) == 0 { return nil diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index f9746c4c61..40c4210abc 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -34,14 +34,13 @@ func NewOOOChunk() *OOOChunk { // Insert inserts the sample such that order is maintained. // Returns false if insert was not possible due to the same timestamp already existing. -func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { +func (o *OOOChunk) Insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { // Although out-of-order samples can be out-of-order amongst themselves, we // are opinionated and expect them to be usually in-order meaning we could // try to append at the end first if the new timestamp is higher than the // last known timestamp. if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { - // TODO(krajorama): pass ST. - o.samples = append(o.samples, sample{0, t, v, h, fh}) + o.samples = append(o.samples, sample{st, t, v, h, fh}) return true } @@ -50,8 +49,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog if i >= len(o.samples) { // none found. append it at the end - // TODO(krajorama): pass ST. - o.samples = append(o.samples, sample{0, t, v, h, fh}) + o.samples = append(o.samples, sample{st, t, v, h, fh}) return true } @@ -63,8 +61,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. o.samples = append(o.samples, sample{}) copy(o.samples[i+1:], o.samples[i:]) - // TODO(krajorama): pass ST. - o.samples[i] = sample{0, t, v, h, fh} + o.samples[i] = sample{st, t, v, h, fh} return true } @@ -76,7 +73,7 @@ func (o *OOOChunk) NumSamples() int { // ToEncodedChunks returns chunks with the samples in the OOOChunk. // //nolint:revive -func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) { +func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memChunk, err error) { if len(o.samples) == 0 { return nil, nil } @@ -96,10 +93,13 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error if s.t > maxt { break } - encoding := chunkenc.EncXOR - if s.h != nil { + encoding := chunkenc.ValFloat.ChunkEncoding(storeST) + switch { + case s.h != nil: + // TODO(krajorama): use ST capable histogram chunk. encoding = chunkenc.EncHistogram - } else if s.fh != nil { + case s.fh != nil: + // TODO(krajorama): use ST capable float histogram chunk. encoding = chunkenc.EncFloatHistogram } @@ -111,15 +111,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) } cmint = s.t - switch encoding { - case chunkenc.EncXOR: - chunk = chunkenc.NewXORChunk() - case chunkenc.EncHistogram: - chunk = chunkenc.NewHistogramChunk() - case chunkenc.EncFloatHistogram: - chunk = chunkenc.NewFloatHistogramChunk() - default: - chunk = chunkenc.NewXORChunk() + chunk, err = chunkenc.NewEmptyChunk(encoding) + if err != nil { + // This should never happen. No point using a default type as + // calling the wrong append function would panic. + return chks, err } app, err = chunk.Appender() if err != nil { @@ -127,18 +123,17 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error } } switch encoding { - case chunkenc.EncXOR: - // TODO(krajorama): pass ST. - app.Append(0, s.t, s.f) + case chunkenc.EncXOR, chunkenc.EncXOROptST: + app.Append(s.st, s.t, s.f) case chunkenc.EncHistogram: + // TODO(krajorama): handle ST capable histogram chunk. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) var ( newChunk chunkenc.Chunk recoded bool ) - // TODO(krajorama): pass ST. - newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false) + newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.st, s.t, s.h, false) if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) @@ -147,14 +142,14 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error chunk = newChunk } case chunkenc.EncFloatHistogram: + // TODO(krajorama): handle ST capable float histogram chunk. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender) var ( newChunk chunkenc.Chunk recoded bool ) - // TODO(krajorama): pass ST. - newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false) + newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.st, s.t, s.fh, false) if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 5d2347c2d7..c4b84e0d68 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S *chks = (*chks)[:0] if s.ooo != nil { - return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) + return getOOOSeriesChunks(s, oh.head.opts.EnableSTStorage.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) } *chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks) return nil @@ -88,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S // // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { +func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -106,7 +106,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. - chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, c.minTime, c.maxTime) if err != nil { handleChunkWriteError(err) return nil @@ -230,6 +230,11 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu return c, it, err } +// STStorageEnabled returns whether ST storage is enabled in the Head. +func (cr *HeadAndOOOChunkReader) STStorageEnabled() bool { + return cr.head.opts.EnableSTStorage.Load() +} + // ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy // behaviour is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { @@ -347,7 +352,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, } var lastMmapRef chunks.ChunkDiskMapperRef - mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger) + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.opts.EnableSTStorage.Load(), head.chunkDiskMapper, head.logger) if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref} @@ -481,7 +486,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return nil } - return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) + return getOOOSeriesChunks(s, ir.ch.head.opts.EnableSTStorage.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) } func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index 99cd357a30..5862ed3bf1 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -31,10 +31,11 @@ const testMaxSize int = 32 func valEven(pos int) int64 { return int64(pos*2 + 2) } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values func valOdd(pos int) int64 { return int64(pos*2 + 1) } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals. -func makeEvenSampleSlice(n int, sampleFunc func(ts int64) sample) []sample { +func makeEvenSampleSlice(n int, sampleFunc func(st, ts int64) sample) []sample { s := make([]sample, n) for i := range n { - s[i] = sampleFunc(valEven(i)) + ts := valEven(i) + s[i] = sampleFunc(ts, ts) // Use ts as st for consistency } return s } @@ -43,23 +44,50 @@ func makeEvenSampleSlice(n int, sampleFunc func(ts int64) sample) []sample { // - Number of pre-existing samples anywhere from 0 to testMaxSize-1. // - Insert new sample before first pre-existing samples, after the last, and anywhere in between. // - With a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves. +// - With st=0 and st!=0 to verify ordering is based on sample.t, not sample.st. func TestOOOInsert(t *testing.T) { scenarios := map[string]struct { - sampleFunc func(ts int64) sample + sampleFunc func(st, ts int64) sample }{ - "float": { - sampleFunc: func(ts int64) sample { - return sample{t: ts, f: float64(ts)} + "float st=0": { + sampleFunc: func(st, ts int64) sample { + return sample{st: 0, t: ts, f: float64(ts)} }, }, - "integer histogram": { - sampleFunc: func(ts int64) sample { - return sample{t: ts, h: tsdbutil.GenerateTestHistogram(ts)} + "float st=ts": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts, t: ts, f: float64(ts)} }, }, - "float histogram": { - sampleFunc: func(ts int64) sample { - return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)} + "float st=ts-100": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts - 100, t: ts, f: float64(ts)} + }, + }, + "float st descending while t ascending": { + // st values go in opposite direction of t to ensure ordering is by t + sampleFunc: func(st, ts int64) sample { + return sample{st: 1000 - ts, t: ts, f: float64(ts)} + }, + }, + "integer histogram st=0": { + sampleFunc: func(st, ts int64) sample { + return sample{st: 0, t: ts, h: tsdbutil.GenerateTestHistogram(ts)} + }, + }, + "integer histogram st=ts": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts, t: ts, h: tsdbutil.GenerateTestHistogram(ts)} + }, + }, + "float histogram st=0": { + sampleFunc: func(st, ts int64) sample { + return sample{st: 0, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)} + }, + }, + "float histogram st=ts": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)} }, }, } @@ -71,7 +99,7 @@ func TestOOOInsert(t *testing.T) { } func testOOOInsert(t *testing.T, - sampleFunc func(ts int64) sample, + sampleFunc func(st, ts int64) sample, ) { for numPreExisting := 0; numPreExisting <= testMaxSize; numPreExisting++ { // For example, if we have numPreExisting 2, then: @@ -84,19 +112,22 @@ func testOOOInsert(t *testing.T, chunk := NewOOOChunk() chunk.samples = make([]sample, numPreExisting) chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc) - newSample := sampleFunc(valOdd(insertPos)) - chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh) + ts := valOdd(insertPos) + newSample := sampleFunc(ts, ts) // Use ts as st for consistency + chunk.Insert(newSample.st, newSample.t, newSample.f, newSample.h, newSample.fh) var expSamples []sample // Our expected new samples slice, will be first the original samples. for i := 0; i < insertPos; i++ { - expSamples = append(expSamples, sampleFunc(valEven(i))) + ts := valEven(i) + expSamples = append(expSamples, sampleFunc(ts, ts)) } // Then the new sample. expSamples = append(expSamples, newSample) // Followed by any original samples that were pushed back by the new one. for i := insertPos; i < numPreExisting; i++ { - expSamples = append(expSamples, sampleFunc(valEven(i))) + ts := valEven(i) + expSamples = append(expSamples, sampleFunc(ts, ts)) } require.Equal(t, expSamples, chunk.samples, "numPreExisting %d, insertPos %d", numPreExisting, insertPos) @@ -107,23 +138,50 @@ func testOOOInsert(t *testing.T, // TestOOOInsertDuplicate tests the correct behavior when inserting a sample that is a duplicate of any // pre-existing samples, with between 1 and testMaxSize pre-existing samples and // with a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves. +// With st=0 and st!=0 to verify duplicate detection is based on sample.t, not sample.st. func TestOOOInsertDuplicate(t *testing.T) { scenarios := map[string]struct { - sampleFunc func(ts int64) sample + sampleFunc func(st, ts int64) sample }{ - "float": { - sampleFunc: func(ts int64) sample { - return sample{t: ts, f: float64(ts)} + "float st=0": { + sampleFunc: func(st, ts int64) sample { + return sample{st: 0, t: ts, f: float64(ts)} }, }, - "integer histogram": { - sampleFunc: func(ts int64) sample { - return sample{t: ts, h: tsdbutil.GenerateTestHistogram(ts)} + "float st=ts": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts, t: ts, f: float64(ts)} }, }, - "float histogram": { - sampleFunc: func(ts int64) sample { - return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)} + "float st=ts-100": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts - 100, t: ts, f: float64(ts)} + }, + }, + "float st descending while t ascending": { + // st values go in opposite direction of t to ensure duplicate detection is by t + sampleFunc: func(st, ts int64) sample { + return sample{st: 1000 - ts, t: ts, f: float64(ts)} + }, + }, + "integer histogram st=0": { + sampleFunc: func(st, ts int64) sample { + return sample{st: 0, t: ts, h: tsdbutil.GenerateTestHistogram(ts)} + }, + }, + "integer histogram st=ts": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts, t: ts, h: tsdbutil.GenerateTestHistogram(ts)} + }, + }, + "float histogram st=0": { + sampleFunc: func(st, ts int64) sample { + return sample{st: 0, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)} + }, + }, + "float histogram st=ts": { + sampleFunc: func(st, ts int64) sample { + return sample{st: ts, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)} }, }, } @@ -135,7 +193,7 @@ func TestOOOInsertDuplicate(t *testing.T) { } func testOOOInsertDuplicate(t *testing.T, - sampleFunc func(ts int64) sample, + sampleFunc func(st, ts int64) sample, ) { for num := 1; num <= testMaxSize; num++ { for dupPos := 0; dupPos < num; dupPos++ { @@ -145,7 +203,7 @@ func testOOOInsertDuplicate(t *testing.T, dupSample := chunk.samples[dupPos] dupSample.f = 0.123 - ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh) + ok := chunk.Insert(dupSample.st, dupSample.t, dupSample.f, dupSample.h, dupSample.fh) expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change. require.False(t, ok) @@ -252,17 +310,17 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { for _, s := range tc.samples { switch s.Type() { case chunkenc.ValFloat: - oooChunk.Insert(s.t, s.f, nil, nil) + oooChunk.Insert(s.st, s.t, s.f, nil, nil) case chunkenc.ValHistogram: - oooChunk.Insert(s.t, 0, s.h.Copy(), nil) + oooChunk.Insert(s.st, s.t, 0, s.h.Copy(), nil) case chunkenc.ValFloatHistogram: - oooChunk.Insert(s.t, 0, nil, s.fh.Copy()) + oooChunk.Insert(s.st, s.t, 0, nil, s.fh.Copy()) default: t.Fatalf("unexpected sample type %d", s.Type()) } } - chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chunks, err := oooChunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chunks, len(tc.expectedChunks), "number of chunks") sampleIndex := 0 @@ -308,3 +366,87 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { }) } } + +// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with storeST=true and storeST=false for float samples. +// When storeST=true, st values are preserved; when storeST=false, AtST() returns 0. +// TODO(@krajorama): Add histogram test cases once ST storage is implemented for histograms. +func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) { + testCases := map[string]struct { + samples []sample + }{ + "floats with st=0": { + samples: []sample{ + {st: 0, t: 1000, f: 43.0}, + {st: 0, t: 1100, f: 42.0}, + }, + }, + "floats with st=t": { + samples: []sample{ + {st: 1000, t: 1000, f: 43.0}, + {st: 1100, t: 1100, f: 42.0}, + }, + }, + "floats with st=t-100": { + samples: []sample{ + {st: 900, t: 1000, f: 43.0}, + {st: 1000, t: 1100, f: 42.0}, + }, + }, + "floats with varying st": { + samples: []sample{ + {st: 500, t: 1000, f: 43.0}, + {st: 1100, t: 1100, f: 42.0}, // st == t + {st: 0, t: 1200, f: 41.0}, // st == 0 + }, + }, + } + + storageScenarios := []struct { + name string + storeST bool + expectedEncoding chunkenc.Encoding + }{ + {"storeST=true", true, chunkenc.EncXOROptST}, + {"storeST=false", false, chunkenc.EncXOR}, + } + + for name, tc := range testCases { + for _, ss := range storageScenarios { + t.Run(name+"/"+ss.name, func(t *testing.T) { + oooChunk := OOOChunk{} + for _, s := range tc.samples { + oooChunk.Insert(s.st, s.t, s.f, nil, nil) + } + + chunks, err := oooChunk.ToEncodedChunks(ss.storeST, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + require.Len(t, chunks, 1, "number of chunks") + + c := chunks[0] + require.Equal(t, ss.expectedEncoding, c.chunk.Encoding(), "chunk encoding") + require.Equal(t, tc.samples[0].t, c.minTime, "chunk minTime") + require.Equal(t, tc.samples[len(tc.samples)-1].t, c.maxTime, "chunk maxTime") + + // Verify samples can be read back with correct st and t values. + it := c.chunk.Iterator(nil) + sampleIndex := 0 + for it.Next() == chunkenc.ValFloat { + gotT, gotF := it.At() + gotST := it.AtST() + + if ss.storeST { + // When storeST=true, st values should be preserved. + require.Equal(t, tc.samples[sampleIndex].st, gotST, "sample %d st", sampleIndex) + } else { + // When storeST=false, AtST() should return 0. + require.Equal(t, int64(0), gotST, "sample %d st should be 0 when storeST=false", sampleIndex) + } + require.Equal(t, tc.samples[sampleIndex].t, gotT, "sample %d t", sampleIndex) + require.Equal(t, tc.samples[sampleIndex].f, gotF, "sample %d f", sampleIndex) + sampleIndex++ + } + require.Equal(t, len(tc.samples), sampleIndex, "number of samples") + }) + } + } +} diff --git a/tsdb/querier.go b/tsdb/querier.go index ac7a14e1b3..7caeba8b7b 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -695,6 +695,15 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err } +// stStorageEnabled returns whether ST storage is enabled in the ChunkReader. +// Returns false if the ChunkReader doesn't implement ChunkReaderWithSTStorage. +func (p *populateWithDelGenericSeriesIterator) stStorageEnabled() bool { + if cr, ok := p.cr.(ChunkReaderWithSTStorage); ok { + return cr.STStorageEnabled() + } + return false +} + type blockSeriesEntry struct { chunks ChunkReader blockID ulid.ULID @@ -885,12 +894,17 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { st, t int64 err error ) + newChunk, err = valueType.NewChunk(p.stStorageEnabled()) + if err != nil { + p.err = fmt.Errorf("create new chunk while re-encoding: %w", err) + return false + } + if app, err = newChunk.Appender(); err != nil { + p.err = fmt.Errorf("get chunk appender while re-encoding: %w", err) + return false + } switch valueType { case chunkenc.ValHistogram: - newChunk = chunkenc.NewHistogramChunk() - if app, err = newChunk.Appender(); err != nil { - break - } for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { if vt != chunkenc.ValHistogram { err = fmt.Errorf("found value type %v in histogram chunk", vt) @@ -905,10 +919,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { } } case chunkenc.ValFloat: - newChunk = chunkenc.NewXORChunk() - if app, err = newChunk.Appender(); err != nil { - break - } for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { if vt != chunkenc.ValFloat { err = fmt.Errorf("found value type %v in float chunk", vt) @@ -920,10 +930,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { app.Append(st, t, v) } case chunkenc.ValFloatHistogram: - newChunk = chunkenc.NewFloatHistogramChunk() - if app, err = newChunk.Appender(); err != nil { - break - } for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { if vt != chunkenc.ValFloatHistogram { err = fmt.Errorf("found value type %v in histogram chunk", vt) @@ -1000,7 +1006,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) } cmint = p.currDelIter.AtT() - if currentChunk, err = currentValueType.NewChunk(); err != nil { + if currentChunk, err = currentValueType.NewChunk(p.stStorageEnabled()); err != nil { break } if app, err = currentChunk.Appender(); err != nil {