feat(tsdb): start time written optionally to chunks

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-02-17 17:11:36 +01:00
parent 12f5f2de3c
commit 363e89bb6d
No known key found for this signature in database
GPG key ID: 47A8F9CE80FD7C7F
35 changed files with 1103 additions and 242 deletions

View file

@ -844,10 +844,16 @@ func main() {
template.RegisterFeatures(features.DefaultRegistry)
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
storeST = cfg.tsdb.EnableSTStorage
)
if agentMode {
storeST = cfg.agent.EnableSTStorage
}
var (
remoteStorage = remote.NewStorageWithStoreST(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels, storeST)
fanoutStorage = storage.NewFanoutWithStoreST(logger, storeST, localStorage, remoteStorage)
)
var (

View file

@ -84,11 +84,11 @@ func getCompatibleBlockDuration(maxBlockDuration int64) int64 {
return blockDuration
}
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string) (returnErr error) {
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string, stStorageEnabled bool) (returnErr error) {
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration)
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil)
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil, stStorageEnabled)
if err != nil {
return err
}
@ -228,13 +228,13 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
return nil
}
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) (err error) {
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string, stStorageEnabled bool) (err error) {
p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps.
maxt, mint, err := getMinAndMaxTimestamps(p)
if err != nil {
return fmt.Errorf("getting min and max timestamp: %w", err)
}
if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels); err != nil {
if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels, stStorageEnabled); err != nil {
return fmt.Errorf("block creation: %w", err)
}
return nil

View file

@ -735,7 +735,7 @@ after_eof 1 2
outputDir := t.TempDir()
err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration, test.Labels)
err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration, test.Labels, false)
if !test.IsOk {
require.Error(t, err, test.Description)

View file

@ -282,6 +282,7 @@ func main() {
importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.")
importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool()
importSTStorage := importCmd.Flag("st-storage", "Enable storage of sample start times in blocks.").Hidden().Bool()
maxBlockDuration := importCmd.Flag("max-block-duration", "Maximum duration created blocks may span. Anything less than 2h is ignored.").Hidden().PlaceHolder("<duration>").Duration()
openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.")
openMetricsLabels := openMetricsImportCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times. Example --label=label_name=label_value").StringMap()
@ -452,7 +453,7 @@ func main() {
os.Exit(checkErr(dumpTSDBData(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics, promtoolParser)))
// TODO(aSquare14): Work on adding support for custom block size.
case openMetricsImportCmd.FullCommand():
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration, *openMetricsLabels))
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration, *openMetricsLabels, *importSTStorage))
case importRulesCmd.FullCommand():
os.Exit(checkErr(importRules(serverURL, httpRoundTripper, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, model.UTF8Validation, *importRulesFiles...)))

View file

@ -333,7 +333,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
}
func listBlocks(path string, humanReadable bool) error {
db, err := tsdb.OpenDBReadOnly(path, "", nil)
db, err := tsdb.OpenDBReadOnly(path, "", nil, true)
if err != nil {
return err
}
@ -388,7 +388,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string {
}
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
db, err := tsdb.OpenDBReadOnly(path, "", nil)
db, err := tsdb.OpenDBReadOnly(path, "", nil, true)
if err != nil {
return nil, nil, err
}
@ -707,7 +707,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
type SeriesSetFormatter func(series storage.SeriesSet) error
func dumpTSDBData(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter, p parser.Parser) (err error) {
db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil)
db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil, true)
if err != nil {
return err
}
@ -845,7 +845,7 @@ func checkErr(err error) int {
return 0
}
func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int {
func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string, stStorageEnabled bool) int {
var buf []byte
info, err := os.Stat(path)
if err != nil {
@ -870,7 +870,7 @@ func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxB
return checkErr(fmt.Errorf("create output dir: %w", err))
}
return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels, stStorageEnabled))
}
func displayHistogram(dataType string, datas []int, total int) {

View file

@ -53,7 +53,7 @@ func TestTSDBDumpOpenMetricsRoundTripPipe(t *testing.T) {
}()
// Import samples from OM format
code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{})
code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{}, false)
require.Equal(t, 0, code)
db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil)
require.NoError(t, err)

View file

@ -228,7 +228,7 @@ func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) {
dbDir := t.TempDir()
// Import samples from OM format
err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour, map[string]string{})
err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour, map[string]string{}, false)
require.NoError(t, err)
db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil)
require.NoError(t, err)

View file

@ -31,6 +31,7 @@ type fanout struct {
primary Storage
secondaries []Storage
storeST bool
}
// NewFanout returns a new fanout Storage, which proxies reads and writes
@ -43,10 +44,16 @@ type fanout struct {
//
// NOTE: In the case of Prometheus, it treats all remote storages as secondary / best effort.
func NewFanout(logger *slog.Logger, primary Storage, secondaries ...Storage) Storage {
return NewFanoutWithStoreST(logger, false, primary, secondaries...)
}
// NewFanoutWithStoreST returns a new fanout Storage with start timestamp storage enabled or disabled.
func NewFanoutWithStoreST(logger *slog.Logger, storeST bool, primary Storage, secondaries ...Storage) Storage {
return &fanout{
logger: logger,
primary: primary,
secondaries: secondaries,
storeST: storeST,
}
}
@ -120,7 +127,7 @@ func (f *fanout) ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) {
}
secondaries = append(secondaries, querier)
}
return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMergerWithStoreST(ChainedSeriesMerge, f.storeST)), nil
}
func (f *fanout) Appender(ctx context.Context) Appender {

View file

@ -722,6 +722,11 @@ func (h *samplesIteratorHeap) Pop() any {
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return NewCompactingChunkSeriesMergerWithStoreST(mergeFunc, false)
}
// NewCompactingChunkSeriesMergerWithStoreST is like NewCompactingChunkSeriesMerger, but uses storeST when re-encoding.
func NewCompactingChunkSeriesMergerWithStoreST(mergeFunc VerticalSeriesMergeFunc, storeST bool) VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
@ -736,6 +741,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
storeST: storeST,
}
},
}
@ -748,6 +754,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators []chunks.Iterator
storeST bool
h chunkIteratorHeap
@ -813,7 +820,7 @@ func (c *compactChunkIterator) Next() bool {
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil)
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...), c.storeST).Iterator(nil)
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false

View file

@ -29,6 +29,7 @@ type sampleAndChunkQueryableClient struct {
requiredMatchers []*labels.Matcher
readRecent bool
callback startTimeCallback
storeST bool
}
// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
@ -38,6 +39,7 @@ func NewSampleAndChunkQueryableClient(
requiredMatchers []*labels.Matcher,
readRecent bool,
callback startTimeCallback,
storeST bool,
) storage.SampleAndChunkQueryable {
return &sampleAndChunkQueryableClient{
client: c,
@ -46,6 +48,7 @@ func NewSampleAndChunkQueryableClient(
requiredMatchers: requiredMatchers,
readRecent: readRecent,
callback: callback,
storeST: storeST,
}
}
@ -84,6 +87,7 @@ func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
},
storeST: c.storeST,
}
if c.readRecent {
return cq, nil
@ -229,13 +233,14 @@ func (*querier) Close() error {
// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier.
type chunkQuerier struct {
querier
storeST bool
}
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client.
// It uses remote.querier.Select so it supports external labels and required matchers if specified.
func (q *chunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket).
return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...))
return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...), q.storeST)
}
// Note strings in toFilter must be sorted.

View file

@ -527,6 +527,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
tc.requiredMatchers,
tc.readRecent,
tc.callback,
false,
)
q, err := c.Querier(tc.mint, tc.maxt)
require.NoError(t, err)

View file

@ -56,7 +56,8 @@ type Storage struct {
logger *slog.Logger
mtx sync.Mutex
rws *WriteStorage
rws *WriteStorage
storeST bool
// For reads.
queryables []storage.SampleAndChunkQueryable
@ -67,6 +68,11 @@ var _ storage.Storage = &Storage{}
// NewStorage returns a remote.Storage.
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage {
return NewStorageWithStoreST(l, reg, stCallback, walDir, flushDeadline, sm, enableTypeAndUnitLabels, false)
}
// NewStorageWithStoreST returns a remote.Storage with start timestamp storage enabled or disabled.
func NewStorageWithStoreST(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels, storeST bool) *Storage {
if l == nil {
l = promslog.NewNopLogger()
}
@ -77,6 +83,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC
logger: logger,
deduper: deduper,
localStartTimeCallback: stCallback,
storeST: storeST,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, enableTypeAndUnitLabels)
return s
@ -139,6 +146,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
labelsToEqualityMatchers(rrConf.RequiredMatchers),
rrConf.ReadRecent,
s.localStartTimeCallback,
s.storeST,
))
}
s.queryables = queryables
@ -187,7 +195,7 @@ func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
}
queriers = append(queriers, q)
}
return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, s.storeST)), nil
}
// Appender implements storage.Storage.

View file

@ -289,11 +289,12 @@ func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series {
type seriesSetToChunkSet struct {
SeriesSet
storeST bool
}
// NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples.
func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet {
return &seriesSetToChunkSet{SeriesSet: chk}
func NewSeriesSetToChunkSet(chk SeriesSet, storeST bool) ChunkSeriesSet {
return &seriesSetToChunkSet{SeriesSet: chk, storeST: storeST}
}
func (c *seriesSetToChunkSet) Next() bool {
@ -304,7 +305,7 @@ func (c *seriesSetToChunkSet) Next() bool {
}
func (c *seriesSetToChunkSet) At() ChunkSeries {
return NewSeriesToChunkEncoder(c.SeriesSet.At())
return NewSeriesToChunkEncoder(c.SeriesSet.At(), c.storeST)
}
func (c *seriesSetToChunkSet) Err() error {
@ -313,13 +314,14 @@ func (c *seriesSetToChunkSet) Err() error {
type seriesToChunkEncoder struct {
Series
storeST bool
}
const seriesToChunkEncoderSplit = 120
// NewSeriesToChunkEncoder encodes samples to chunks with 120 samples limit.
func NewSeriesToChunkEncoder(series Series) ChunkSeries {
return &seriesToChunkEncoder{series}
func NewSeriesToChunkEncoder(series Series, storeST bool) ChunkSeries {
return &seriesToChunkEncoder{Series: series, storeST: storeST}
}
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
@ -342,10 +344,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
st := seriesIter.AtST()
if typ != lastType || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one.
chks = appendChunk(chks, mint, maxt, chk)
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
chk, err = typ.NewChunk(s.storeST)
if err != nil {
return errChunksIterator{err: err}
}
@ -360,19 +363,17 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
lastType = typ
var (
st, t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
switch typ {
case chunkenc.ValFloat:
t, v = seriesIter.At()
st = seriesIter.AtST()
app.Append(st, t, v)
case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram(nil)
st = seriesIter.AtST()
newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false)
if err != nil {
return errChunksIterator{err: err}
@ -388,7 +389,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
}
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram(nil)
st = seriesIter.AtST()
newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false)
if err != nil {
return errChunksIterator{err: err}

View file

@ -601,7 +601,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
}
}
series := NewListSeries(lbs, copiedSamples)
encoder := NewSeriesToChunkEncoder(series)
encoder := NewSeriesToChunkEncoder(series, false)
require.Equal(t, lbs, encoder.Labels())
chks, err := ExpandChunks(encoder.Iterator(nil))

View file

@ -425,6 +425,13 @@ func (pb *Block) Size() int64 {
return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta
}
// SetSTStorageEnabled sets whether ST storage is enabled on the block's chunk reader.
func (pb *Block) SetSTStorageEnabled(enabled bool) {
if cr, ok := pb.chunkr.(*chunks.Reader); ok {
cr.SetSTStorageEnabled(enabled)
}
}
// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

View file

@ -76,6 +76,8 @@ type Chunk interface {
Bytes() []byte
// Encoding returns the encoding type of the chunk.
// If the chunk is capable of storing ST (start timestamps), it should
// return the appropriate encoding type (e.g., EncXOROptST).
Encoding() Encoding
// Appender returns an appender to append samples to the chunk.
@ -189,9 +191,12 @@ func (v ValueType) String() string {
}
}
func (v ValueType) ChunkEncoding() Encoding {
func (v ValueType) ChunkEncoding(storeST bool) Encoding {
switch v {
case ValFloat:
if storeST {
return EncXOROptST
}
return EncXOR
case ValHistogram:
return EncHistogram
@ -202,17 +207,8 @@ func (v ValueType) ChunkEncoding() Encoding {
}
}
func (v ValueType) NewChunk() (Chunk, error) {
switch v {
case ValFloat:
return NewXORChunk(), nil
case ValHistogram:
return NewHistogramChunk(), nil
case ValFloatHistogram:
return NewFloatHistogramChunk(), nil
default:
return nil, fmt.Errorf("value type %v unsupported", v)
}
func (v ValueType) NewChunk(storeST bool) (Chunk, error) {
return NewEmptyChunk(v.ChunkEncoding(storeST))
}
// MockSeriesIterator returns an iterator for a mock series with custom
@ -399,6 +395,7 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
}
// NewEmptyChunk returns an empty chunk for the given encoding.
// TODO(krajorama): support storeST for histogram and float histogram chunks when they are implemented.
func NewEmptyChunk(e Encoding) (Chunk, error) {
switch e {
case EncXOR:

View file

@ -135,7 +135,9 @@ type Meta struct {
}
// ChunkFromSamples requires all samples to have the same type.
// TODO(krajorama): test with ST when chunk formats support it.
// It is not efficient and meant for testing purposes only.
// It scans the samples to determine whether any sample has ST set and
// creates a chunk accordingly.
func ChunkFromSamples(s []Sample) (Meta, error) {
return ChunkFromSamplesGeneric(SampleSlice(s))
}
@ -154,7 +156,17 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
}
sampleType := s.Get(0).Type()
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding())
hasST := false
for i := range s.Len() {
if s.Get(i).ST() != 0 {
hasST = true
break
}
}
// Request storing ST in the chunk if available.
c, err := sampleType.NewChunk(hasST)
if err != nil {
return Meta{}, err
}
@ -627,6 +639,8 @@ type Reader struct {
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
stStorageEnabled bool
}
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
@ -695,6 +709,16 @@ func (s *Reader) Size() int64 {
return s.size
}
// SetSTStorageEnabled sets whether ST storage is enabled for this reader.
func (s *Reader) SetSTStorageEnabled(enabled bool) {
s.stStorageEnabled = enabled
}
// STStorageEnabled returns whether ST storage is enabled for this reader.
func (s *Reader) STStorageEnabled() bool {
return s.stStorageEnabled
}
// ChunkOrIterable returns a chunk from a given reference.
func (s *Reader) ChunkOrIterable(meta Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack()

View file

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
@ -58,3 +59,35 @@ func TestWriterWithDefaultSegmentSize(t *testing.T) {
require.NoError(t, err)
require.Len(t, d, 1, "expected only one segment to be created to hold both chunks")
}
func TestChunkFromSamplesWithST(t *testing.T) {
// Create samples with explicit ST (source timestamp) values
samples := []Sample{
sample{t: 10, f: 11, st: 5},
sample{t: 20, f: 12, st: 15},
sample{t: 30, f: 13, st: 25},
}
chk, err := ChunkFromSamples(samples)
require.NoError(t, err)
require.NotNil(t, chk.Chunk)
// Verify MinTime and MaxTime
require.Equal(t, int64(10), chk.MinTime)
require.Equal(t, int64(30), chk.MaxTime)
// Iterate over the chunk and verify ST values are preserved
it := chk.Chunk.Iterator(nil)
idx := 0
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
require.Equal(t, chunkenc.ValFloat, vt)
ts, v := it.At()
st := it.AtST()
require.Equal(t, samples[idx].ST(), st, "ST mismatch at index %d", idx)
require.Equal(t, samples[idx].T(), ts, "T mismatch at index %d", idx)
require.Equal(t, samples[idx].F(), v, "F mismatch at index %d", idx)
idx++
}
require.NoError(t, it.Err())
require.Equal(t, len(samples), idx, "expected all samples to be iterated")
}

View file

@ -178,6 +178,9 @@ type LeveledCompactorOptions struct {
// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
EnableOverlappingCompaction bool
// EnableSTStorage determines whether compaction should re-encode chunks with start timestamps.
EnableSTStorage bool
// Metrics is set of metrics for Compactor. By default, NewCompactorMetrics would be called to initialize metrics unless it is provided.
Metrics *CompactorMetrics
// UseUncachedIO allows bypassing the page cache when appropriate.
@ -211,7 +214,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
}
mergeFunc := opts.MergeFunc
if mergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
mergeFunc = storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, opts.EnableSTStorage)
}
maxBlockChunkSegmentSize := opts.MaxBlockChunkSegmentSize
if maxBlockChunkSegmentSize == 0 {

View file

@ -1422,7 +1422,7 @@ func TestCancelCompactions(t *testing.T) {
// Make sure that no blocks were marked as compaction failed.
// This checks that the `context.Canceled` error is properly checked at all levels:
// - callers should check with errors.Is() instead of ==.
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger())
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger(), false)
require.NoError(t, err)
blocks, err := readOnlyDB.Blocks()
require.NoError(t, err)

View file

@ -494,15 +494,17 @@ var ErrClosed = errors.New("db already closed")
// Current implementation doesn't support concurrency so
// all API calls should happen in the same go routine.
type DBReadOnly struct {
logger *slog.Logger
dir string
sandboxDir string
closers []io.Closer
closed chan struct{}
logger *slog.Logger
dir string
sandboxDir string
closers []io.Closer
closed chan struct{}
stStorageEnabled bool
}
// OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, error) {
// stStorageEnabled should be true when reading blocks that may contain ST data.
func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger, stStorageEnabled bool) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, fmt.Errorf("opening the db dir: %w", err)
}
@ -520,10 +522,11 @@ func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, er
}
return &DBReadOnly{
logger: l,
dir: dir,
sandboxDir: sandboxDir,
closed: make(chan struct{}),
logger: l,
dir: dir,
sandboxDir: sandboxDir,
closed: make(chan struct{}),
stStorageEnabled: stStorageEnabled,
}, nil
}
@ -590,7 +593,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
return nil
}
func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQueryable, error) {
func (db *DBReadOnly) loadDataAsQueryable(maxt int64, enableSTStorage bool) (storage.SampleAndChunkQueryable, error) {
select {
case <-db.closed:
return nil, ErrClosed
@ -662,9 +665,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
db.closers = append(db.closers, head)
dbOpts := DefaultOptions()
dbOpts.EnableSTStorage = enableSTStorage
return &DB{
dir: db.dir,
logger: db.logger,
opts: dbOpts,
blocks: blocks,
head: head,
blockQuerierFunc: NewBlockQuerier,
@ -675,7 +681,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
// Querier loads the blocks and wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) {
q, err := db.loadDataAsQueryable(maxt)
q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled)
if err != nil {
return nil, err
}
@ -685,7 +691,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) {
// ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range.
// Current implementation doesn't support multiple ChunkQueriers.
func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := db.loadDataAsQueryable(maxt)
q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled)
if err != nil {
return nil, err
}
@ -699,7 +705,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory)
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory, db.stStorageEnabled)
if err != nil {
return nil, err
}
@ -813,6 +819,7 @@ func (db *DBReadOnly) Block(blockID string, postingsDecoderFactory PostingsDecod
if err != nil {
return nil, err
}
block.SetSTStorageEnabled(db.stStorageEnabled)
db.closers = append(db.closers, block)
return block, nil
@ -983,6 +990,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
PD: opts.PostingsDecoderFactory,
UseUncachedIO: opts.UseUncachedIO,
BlockExcludeFilter: opts.BlockCompactionExcludeFunc,
EnableSTStorage: opts.EnableSTStorage,
})
}
if err != nil {
@ -1044,6 +1052,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.EnableSharding = opts.EnableSharding
headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample
headOpts.EnableSTStorage.Store(opts.EnableSTStorage)
headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
@ -1786,7 +1795,7 @@ func (db *DB) reloadBlocks() (err error) {
}()
db.mtx.RLock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory)
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory, db.opts.EnableSTStorage)
db.mtx.RUnlock()
if err != nil {
return err
@ -1886,7 +1895,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil
}
func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, stStorageEnabled bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, fmt.Errorf("find blocks: %w", err)
@ -1908,6 +1917,7 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.
corrupted[meta.ULID] = err
continue
}
block.SetSTStorageEnabled(stStorageEnabled)
}
blocks = append(blocks, block)
}
@ -2433,7 +2443,7 @@ func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
if err != nil {
return nil, err
}
return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, db.opts.EnableSTStorage)), nil
}
func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {

View file

@ -1507,7 +1507,7 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) {
})
for _, enableStStorage := range []bool{false, true} {
t.Run("wal-only,stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
t.Run("wal-only-st-"+strconv.FormatBool(enableStStorage), func(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
@ -1547,7 +1547,7 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) {
require.True(t, db.head.initialized())
})
for _, enableStStorage := range []bool{false, true} {
t.Run("existing-block-and-wal,stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
t.Run("existing-block-and-wal-st-"+strconv.FormatBool(enableStStorage), func(t *testing.T) {
dir := t.TempDir()
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
@ -1981,7 +1981,7 @@ func TestDBReadOnly_AppendV2(t *testing.T) {
}
// Open a read only db and ensure that the API returns the same result as the normal DB.
dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger)
dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger, false)
require.NoError(t, err)
defer func() { require.NoError(t, dbReadOnly.Close()) }()
@ -2058,7 +2058,7 @@ func TestDBReadOnly_FlushWAL_AppendV2(t *testing.T) {
}
// Flush WAL.
db, err := OpenDBReadOnly(dbDir, "", logger)
db, err := OpenDBReadOnly(dbDir, "", logger, false)
require.NoError(t, err)
flush := t.TempDir()
@ -2066,7 +2066,7 @@ func TestDBReadOnly_FlushWAL_AppendV2(t *testing.T) {
require.NoError(t, db.Close())
// Reopen the DB from the flushed WAL block.
db, err = OpenDBReadOnly(flush, "", logger)
db, err = OpenDBReadOnly(flush, "", logger, false)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
blocks, err := db.Blocks()
@ -2114,7 +2114,7 @@ func TestDBReadOnly_Querier_NoAlteration_AppendV2(t *testing.T) {
spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
dBDirHash := dirHash(dir)
// Bootstrap a RO db from the same dir and set up a querier.
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil)
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil, false)
require.NoError(t, err)
require.Equal(t, chunksCount, countChunks(dir))
q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)
@ -7512,6 +7512,70 @@ func TestAbortBlockCompactions_AppendV2(t *testing.T) {
require.Equal(t, 4, compactions, "expected 4 compactions to be completed")
}
// TestCompactHeadWithSTStorage_AppendV2 ensures that when EnableSTStorage is true,
// compacted blocks contain chunks with EncXOROptST encoding for float samples.
func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) {
t.Parallel()
opts := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: compression.Snappy,
EnableSTStorage: true,
}
db := newTestDB(t, withOpts(opts))
ctx := context.Background()
app := db.AppenderV2(ctx)
maxt := 100
for i := range maxt {
// AppendV2 signature: (ref, labels, st, t, v, h, fh, opts)
// st=0 (start timestamp), t=i (sample timestamp)
// TODO(krajorama): verify with non zero st once the API supports it.
_, err := app.Append(0, labels.FromStrings("a", "b"), 0, int64(i), float64(i), nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Compact the Head to create a new block.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
// Check that we have exactly one block.
require.Len(t, db.Blocks(), 1)
b := db.Blocks()[0]
// Open chunk reader and index reader.
chunkr, err := b.Chunks()
require.NoError(t, err)
defer chunkr.Close()
indexr, err := b.Index()
require.NoError(t, err)
defer indexr.Close()
// Get postings for the series.
p, err := indexr.Postings(ctx, "a", "b")
require.NoError(t, err)
chunkCount := 0
for p.Next() {
var builder labels.ScratchBuilder
var chks []chunks.Meta
require.NoError(t, indexr.Series(p.At(), &builder, &chks))
for _, chk := range chks {
c, _, err := chunkr.ChunkOrIterable(chk)
require.NoError(t, err)
require.Equal(t, chunkenc.EncXOROptST, c.Encoding(),
"expected EncXOROptST encoding when EnableSTStorage=true, got %s", c.Encoding())
chunkCount++
}
}
require.NoError(t, p.Err())
require.Positive(t, chunkCount, "expected at least one chunk")
}
func TestNewCompactorFunc_AppendV2(t *testing.T) {
opts := DefaultOptions()
block1 := ulid.MustNew(1, nil)
@ -7543,3 +7607,114 @@ func TestNewCompactorFunc_AppendV2(t *testing.T) {
require.Len(t, ulids, 1)
require.Equal(t, block2, ulids[0])
}
// TestDBAppenderV2_STStorage_OutOfOrder verifies that ST storage works correctly
// when samples are appended out of order and can be queried using ChunkQuerier.
func TestDBAppenderV2_STStorage_OutOfOrder(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testHistogram.CounterResetHint = histogram.NotCounterReset
testCases := []struct {
name string
appendSamples []chunks.Sample // Samples in append order (out of order)
expectedSamples []chunks.Sample // Expected samples in time order after query
}{
{
name: "Float samples out of order",
appendSamples: []chunks.Sample{
newSample(20, 200, 2.0, nil, nil), // Append second sample first
newSample(10, 100, 1.0, nil, nil), // Append first sample second (OOO)
newSample(30, 300, 3.0, nil, nil), // Append third sample last
newSample(25, 250, 2.5, nil, nil), // Append middle sample (OOO)
},
expectedSamples: []chunks.Sample{
newSample(10, 100, 1.0, nil, nil),
newSample(20, 200, 2.0, nil, nil),
newSample(25, 250, 2.5, nil, nil),
newSample(30, 300, 3.0, nil, nil),
},
},
{
name: "Histogram samples out of order",
appendSamples: []chunks.Sample{
newSample(30, 300, 0, testHistogram, nil), // Append third sample first
newSample(10, 100, 0, testHistogram, nil), // Append first sample second (OOO)
newSample(20, 200, 0, testHistogram, nil), // Append second sample last (OOO)
},
// Histograms don't support ST storage yet, should return 0 for ST
expectedSamples: []chunks.Sample{
newSample(0, 100, 0, testHistogram, nil),
newSample(0, 200, 0, testHistogram, nil),
newSample(0, 300, 0, testHistogram, nil),
},
},
{
name: "Mixed float samples with same ST",
appendSamples: []chunks.Sample{
newSample(10, 200, 2.0, nil, nil),
newSample(10, 100, 1.0, nil, nil), // OOO with same ST
newSample(10, 300, 3.0, nil, nil),
},
expectedSamples: []chunks.Sample{
newSample(10, 100, 1.0, nil, nil),
newSample(10, 200, 2.0, nil, nil),
newSample(10, 300, 3.0, nil, nil),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.EnableSTStorage = true
db := newTestDB(t, withOpts(opts))
db.DisableCompactions()
lbls := labels.FromStrings("foo", "bar")
// Append samples in the specified (out of order) sequence
for _, s := range tc.appendSamples {
app := db.AppenderV2(context.Background())
_, err := app.Append(0, lbls, s.ST(), s.T(), s.F(), s.H(), s.FH(), storage.AOptions{})
require.NoError(t, err, "Appending OOO sample with ST should succeed")
require.NoError(t, app.Commit(), "Committing OOO sample with ST should succeed")
}
// Query using ChunkQuerier to verify ST values
querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer querier.Close()
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next(), "Should have series")
series := ss.At()
require.NoError(t, ss.Err())
require.False(t, ss.Next(), "Should have only one series")
// Iterate through chunks and collect samples using storage.ExpandSamples
chunkIt := series.Iterator(nil)
var actualSamples []chunks.Sample
for chunkIt.Next() {
chk := chunkIt.At()
it := chk.Chunk.Iterator(nil)
samples, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
actualSamples = append(actualSamples, samples...)
}
require.NoError(t, chunkIt.Err())
// Verify samples are in time order with correct values
// Use requireEqualSamplesIgnoreCounterResets to ignore histogram counter reset hints
requireEqualSamples(t, lbls.String(), tc.expectedSamples, actualSamples, requireEqualSamplesIgnoreCounterResets)
// Additionally verify ST values match expectations
require.Len(t, actualSamples, len(tc.expectedSamples))
for i, expected := range tc.expectedSamples {
actual := actualSamples[i]
require.Equal(t, expected.ST(), actual.ST(), "Sample %d: ST should match", i)
}
})
}
}

View file

@ -2553,7 +2553,7 @@ func TestDBReadOnly(t *testing.T) {
}
// Open a read only db and ensure that the API returns the same result as the normal DB.
dbReadOnly, err := OpenDBReadOnly(dbDir, "", nil)
dbReadOnly, err := OpenDBReadOnly(dbDir, "", nil, false)
require.NoError(t, err)
defer func() { require.NoError(t, dbReadOnly.Close()) }()
@ -2609,7 +2609,7 @@ func TestDBReadOnly(t *testing.T) {
func TestDBReadOnlyClosing(t *testing.T) {
t.Parallel()
sandboxDir := t.TempDir()
db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{}))
db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{}), false)
require.NoError(t, err)
// The sandboxDir was there.
require.DirExists(t, db.sandboxDir)
@ -2648,7 +2648,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
}
// Flush WAL.
db, err := OpenDBReadOnly(dbDir, "", nil)
db, err := OpenDBReadOnly(dbDir, "", nil, false)
require.NoError(t, err)
flush := t.TempDir()
@ -2656,7 +2656,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.NoError(t, db.Close())
// Reopen the DB from the flushed WAL block.
db, err = OpenDBReadOnly(flush, "", nil)
db, err = OpenDBReadOnly(flush, "", nil, false)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
blocks, err := db.Blocks()
@ -2704,7 +2704,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
dBDirHash := dirHash(dir)
// Bootstrap a RO db from the same dir and set up a querier.
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil)
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil, false)
require.NoError(t, err)
require.Equal(t, chunksCount, countChunks(dir))
q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)
@ -9626,3 +9626,66 @@ func TestStaleSeriesCompactionWithZeroSeries(t *testing.T) {
// Should still have no blocks since there was nothing to compact.
require.Empty(t, db.Blocks())
}
// TestCompactHeadWithSTStorage ensures that when EnableSTStorage is true,
// compacted blocks contain chunks with EncXOR encoding for float samples
// when using the original Appender (which does not support start timestamps).
func TestCompactHeadWithSTStorage(t *testing.T) {
t.Parallel()
opts := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: compression.Snappy,
EnableSTStorage: true,
}
db := newTestDB(t, withOpts(opts))
ctx := context.Background()
app := db.Appender(ctx)
maxt := 100
for i := range maxt {
// Original Appender signature: (ref, labels, t, v)
_, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Compact the Head to create a new block.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
// Check that we have exactly one block.
require.Len(t, db.Blocks(), 1)
b := db.Blocks()[0]
// Open chunk reader and index reader.
chunkr, err := b.Chunks()
require.NoError(t, err)
defer chunkr.Close()
indexr, err := b.Index()
require.NoError(t, err)
defer indexr.Close()
// Get postings for the series.
p, err := indexr.Postings(ctx, "a", "b")
require.NoError(t, err)
chunkCount := 0
for p.Next() {
var builder labels.ScratchBuilder
var chks []chunks.Meta
require.NoError(t, indexr.Series(p.At(), &builder, &chks))
for _, chk := range chks {
c, _, err := chunkr.ChunkOrIterable(chk)
require.NoError(t, err)
require.Equal(t, chunkenc.EncXOROptST, c.Encoding(),
"expected EncXOR encoding when using original Appender, got %s", c.Encoding())
chunkCount++
}
}
require.NoError(t, p.Err())
require.Positive(t, chunkCount, "expected at least one chunk")
}

View file

@ -160,6 +160,11 @@ type HeadOptions struct {
OutOfOrderTimeWindow atomic.Int64
OutOfOrderCapMax atomic.Int64
// EnableSTStorage determines whether databases (WAL/WBL, tsdb,
// agent) should set a Start Time value per sample.
// Represents 'st-storage' feature flag.
EnableSTStorage atomic.Bool
ChunkRange int64
// ChunkDirRoot is the parent directory of the chunks directory.
ChunkDirRoot string
@ -200,11 +205,6 @@ type HeadOptions struct {
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableMetadataWALRecords bool
// EnableSTStorage determines whether agent DB should write a Start Timestamp (ST)
// per sample to WAL.
// TODO(bwplotka): Implement this option as per PROM-60, currently it's noop.
EnableSTStorage bool
}
const (
@ -1386,7 +1386,7 @@ func (h *Head) truncateWAL(mint int64) error {
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint, h.opts.EnableSTStorage); err != nil {
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint, h.opts.EnableSTStorage.Load()); err != nil {
h.metrics.checkpointCreationFail.Inc()
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
@ -1680,7 +1680,7 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match
}
if h.wal != nil {
enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage}
enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage.Load()}
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err
}

View file

@ -185,6 +185,7 @@ func (h *Head) appender() *headAppender {
typesInBatch: h.getTypeMap(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
storeST: h.opts.EnableSTStorage.Load(),
},
}
}
@ -412,6 +413,7 @@ type headAppenderBase struct {
appendID, cleanupAppendIDsBelow uint64
closed bool
storeST bool
}
type headAppender struct {
headAppenderBase
@ -1387,7 +1389,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
ok, chunkCreated, mmapRefs = series.insert(a.storeST, s.ST, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1431,7 +1433,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
ok, chunkCreated = series.append(a.storeST, s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1492,7 +1494,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
// TODO(krajorama,ywwg): Pass ST when available in WAL.
ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1540,7 +1543,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
}
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
// TODO(krajorama,ywwg): pass ST when available in WAL.
ok, chunkCreated = series.appendHistogram(a.storeST, 0, s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1601,7 +1605,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
// TODO(krajorama,ywwg): Pass ST when available in WAL.
ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1649,7 +1654,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
}
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
// TODO(krajorama,ywwg): pass ST when available in WAL.
ok, chunkCreated = series.appendFloatHistogram(a.storeST, 0, s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1799,18 +1805,18 @@ func (a *headAppenderBase) Commit() (err error) {
}
// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
func (s *memSeries) insert(storeST bool, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
s.ooo = &memSeriesOOOFields{}
}
c := s.ooo.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger)
c, mmapRefs = s.cutNewOOOHeadChunk(storeST, t, chunkDiskMapper, logger)
chunkCreated = true
}
ok := c.chunk.Insert(t, v, h, fh)
ok := c.chunk.Insert(st, t, v, h, fh)
if ok {
if chunkCreated || t < c.minTime {
c.minTime = t
@ -1833,13 +1839,12 @@ type chunkOpts struct {
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
// TODO(krajorama): pass ST.
s.app.Append(0, t, v)
s.app.Append(st, t, v)
c.maxTime = t
@ -1859,14 +1864,14 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1881,8 +1886,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
prevApp = nil
}
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed
s.lastHistogramValue = h
s.lastFloatHistogramValue = nil
@ -1917,14 +1921,14 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1939,8 +1943,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
prevApp = nil
}
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed.
s.lastHistogramValue = nil
s.lastFloatHistogramValue = fh
@ -2164,8 +2167,8 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s is locked and s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger)
func (s *memSeries) cutNewOOOHeadChunk(storeST bool, mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(storeST, chunkDiskMapper, logger)
s.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(),
@ -2177,12 +2180,12 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
}
// s must be locked when calling.
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef {
func (s *memSeries) mmapCurrentOOOHeadChunk(storeST bool, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef {
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
return nil
}
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, math.MinInt64, math.MaxInt64)
if err != nil {
handleChunkWriteError(err)
return nil

View file

@ -95,6 +95,7 @@ func (h *Head) appenderV2() *headAppenderV2 {
typesInBatch: h.getTypeMap(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
storeST: h.opts.EnableSTStorage.Load(),
},
}
}
@ -141,7 +142,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
}
// TODO(bwplotka): Handle ST natively (as per PROM-60).
if a.head.opts.EnableSTAsZeroSample && st != 0 {
if st != 0 && a.head.opts.EnableSTAsZeroSample {
a.bestEffortAppendSTZeroSample(s, ls, st, t, h, fh)
}
@ -177,7 +178,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
// we do not need to check for the difference between "unknown
// series" and "known series with stNone".
}
appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder)
appErr = a.appendFloat(s, st, t, v, opts.RejectOutOfOrder)
}
// Handle append error, if any.
if appErr != nil {
@ -218,7 +219,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
return storage.SeriesRef(s.ref), partialErr
}
func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error {
func (a *headAppenderV2) appendFloat(s *memSeries, st, t int64, v float64, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
@ -239,7 +240,7 @@ func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejec
}
b := a.getCurrentBatch(stFloat, s.ref)
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v})
b.floats = append(b.floats, record.RefSample{Ref: s.ref, ST: st, T: t, V: v})
b.floatSeries = append(b.floatSeries, s)
return nil
}
@ -366,7 +367,7 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La
}
err = a.appendHistogram(s, st, zeroHistogram, true)
default:
err = a.appendFloat(s, st, 0, true)
err = a.appendFloat(s, 0, st, 0, true)
}
if err != nil {

View file

@ -2925,13 +2925,15 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot_AppenderV2(t *testing.T) {
// TestWBLReplay checks the replay at a low level.
func TestWBLReplay_AppenderV2(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testWBLReplayAppenderV2(t, scenario)
})
for _, enableSTstorage := range []bool{false, true} {
t.Run(fmt.Sprintf("%s/st-storage=%v", name, enableSTstorage), func(t *testing.T) {
testWBLReplayAppenderV2(t, scenario, enableSTstorage)
})
}
}
}
func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableSTstorage bool) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
@ -2942,6 +2944,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
opts.EnableSTStorage.Store(enableSTstorage)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -2993,7 +2996,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
require.False(t, ok)
require.NotNil(t, ms)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(h.opts.EnableSTStorage.Load(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
require.Len(t, chks, 1)
@ -4754,3 +4757,138 @@ func TestHeadAppenderV2_Append_HistogramStalenessConversionMetrics(t *testing.T)
})
}
}
// TestHeadAppender_STStorage verifies that when EnableSTStorage is true,
// start timestamps are properly stored in chunks and returned by queries.
// This test uses AppenderV2 which has native ST support.
func TestHeadAppenderV2_STStorage(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testHistogram.CounterResetHint = histogram.NotCounterReset
type sampleData struct {
st int64
ts int64
fSample float64
h *histogram.Histogram
}
testCases := []struct {
name string
samples []sampleData
expectedSTs []int64 // Expected ST values
isHistogram bool
}{
{
name: "Float samples with ST",
samples: []sampleData{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
{st: 30, ts: 300, fSample: 3.0},
},
expectedSTs: []int64{10, 20, 30},
isHistogram: false,
},
{
name: "Float samples with varying ST",
samples: []sampleData{
{st: 5, ts: 100, fSample: 1.0},
{st: 5, ts: 200, fSample: 2.0}, // Same ST
{st: 150, ts: 300, fSample: 3.0}, // Different ST
},
expectedSTs: []int64{5, 5, 150},
isHistogram: false,
},
{
name: "Histogram samples",
samples: []sampleData{
{st: 10, ts: 100, h: testHistogram},
{st: 20, ts: 200, h: testHistogram},
{st: 30, ts: 300, h: testHistogram},
},
// Histograms don't support ST storage yet, should return 0
expectedSTs: []int64{0, 0, 0},
isHistogram: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(true)
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
// Use AppenderV2 which has native ST support
a := h.AppenderV2(context.Background())
for _, s := range tc.samples {
_, err := a.Append(0, lbls, s.st, s.ts, s.fSample, s.h, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, a.Commit())
// Verify ST values are stored in chunks
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
// Read chunks and verify ST values
var actualSTs []int64
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
st := it.AtST()
actualSTs = append(actualSTs, st)
}
require.NoError(t, it.Err())
}
// Verify expected ST values
if tc.isHistogram {
require.Equal(t, tc.expectedSTs, actualSTs, "Histogram samples should return 0 for ST")
} else {
require.Equal(t, tc.expectedSTs, actualSTs, "Float samples should have ST stored")
}
// Also verify via querier
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer q.Close()
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
series := ss.At()
require.NoError(t, ss.Err())
seriesIt := series.Iterator(nil)
var queriedSTs []int64
for seriesIt.Next() != chunkenc.ValNone {
st := seriesIt.AtST()
queriedSTs = append(queriedSTs, st)
}
require.NoError(t, seriesIt.Err())
// Verify querier returns same ST values
require.Equal(t, tc.expectedSTs, queriedSTs, "Querier should return same ST values as chunk iterator")
})
}
}

View file

@ -444,6 +444,17 @@ type ChunkReaderWithCopy interface {
ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)
}
// ChunkReaderWithSTStorage is an optional interface that ChunkReaders can
// implement to indicate whether ST (start time) storage is enabled.
type ChunkReaderWithSTStorage interface {
STStorageEnabled() bool
}
// STStorageEnabled returns whether ST storage is enabled in the Head.
func (h *headChunkReader) STStorageEnabled() bool {
return h.head.opts.EnableSTStorage.Load()
}
// ChunkOrIterableWithCopy returns the chunk for the reference number.
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk.
func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {

View file

@ -33,7 +33,7 @@ func TestMemSeries_chunk(t *testing.T) {
appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) {
for i := start; i < end; i += chunkStep {
ok, _ := s.append(i, float64(i), 0, chunkOpts{
ok, _ := s.append(false, 0, i, float64(i), 0, chunkOpts{
chunkDiskMapper: cdm,
chunkRange: chunkRange,
samplesPerChunk: DefaultSamplesPerChunk,

View file

@ -349,7 +349,7 @@ func BenchmarkLoadWLs(b *testing.B) {
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false)
s.append(c.mmappedChunkT, 42, 0, cOpts)
s.append(false, 0, c.mmappedChunkT, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
// the sample at c.mmappedChunkT is mmapped.
@ -1492,7 +1492,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}
s.mmapChunks(chunkDiskMapper)
@ -1642,7 +1642,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
if tc.mmappedChunks > 0 {
headStart = (tc.mmappedChunks + 1) * chunkRange
for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep {
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}
series.mmapChunks(chunkDiskMapper)
@ -1652,7 +1652,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
series.headChunks = nil
} else {
for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep {
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed: %d", i)
}
}
@ -2183,7 +2183,41 @@ func TestComputeChunkEndTime(t *testing.T) {
}
}
// TestMemSeries_append tests float appending with various storeST/st combinations.
func TestMemSeries_append(t *testing.T) {
scenarios := map[string]struct {
storeST bool
stFunc func(ts int64) int64 // Function to compute st from ts
}{
"storeST=false st=0": {
storeST: false,
stFunc: func(_ int64) int64 { return 0 },
},
"storeST=true st=0": {
storeST: true,
stFunc: func(_ int64) int64 { return 0 },
},
"storeST=true st=ts": {
storeST: true,
stFunc: func(ts int64) int64 { return ts },
},
"storeST=true st=ts-100": {
storeST: true,
stFunc: func(ts int64) int64 { return ts - 100 },
},
"storeST=false st=ts (st ignored)": {
storeST: false,
stFunc: func(ts int64) int64 { return ts },
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
testMemSeriesAppend(t, scenario.storeST, scenario.stFunc)
})
}
}
func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
@ -2202,20 +2236,20 @@ func TestMemSeries_append(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0, cOpts)
ok, chunkCreated := s.append(storeST, stFunc(998), 998, 1, 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0, cOpts)
ok, chunkCreated = s.append(storeST, stFunc(999), 999, 2, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
s.mmapChunks(chunkDiskMapper)
ok, chunkCreated = s.append(1000, 3, 0, cOpts)
ok, chunkCreated = s.append(storeST, stFunc(1000), 1000, 3, 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0, cOpts)
ok, chunkCreated = s.append(storeST, stFunc(1001), 1001, 4, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -2229,7 +2263,8 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts)
ts := 1001 + int64(i)
ok, _ := s.append(storeST, stFunc(ts), ts, float64(i), 0, cOpts)
require.True(t, ok, "append failed")
}
s.mmapChunks(chunkDiskMapper)
@ -2244,7 +2279,41 @@ func TestMemSeries_append(t *testing.T) {
}
}
// TestMemSeries_appendHistogram tests histogram appending with various storeST/st combinations.
func TestMemSeries_appendHistogram(t *testing.T) {
scenarios := map[string]struct {
storeST bool
stFunc func(ts int64) int64 // Function to compute st from ts
}{
"storeST=false st=0": {
storeST: false,
stFunc: func(_ int64) int64 { return 0 },
},
"storeST=true st=0": {
storeST: true,
stFunc: func(_ int64) int64 { return 0 },
},
"storeST=true st=ts": {
storeST: true,
stFunc: func(ts int64) int64 { return ts },
},
"storeST=true st=ts-100": {
storeST: true,
stFunc: func(ts int64) int64 { return ts - 100 },
},
"storeST=false st=ts (st ignored)": {
storeST: false,
stFunc: func(ts int64) int64 { return ts },
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
testMemSeriesAppendHistogram(t, scenario.storeST, scenario.stFunc)
})
}
}
func testMemSeriesAppendHistogram(t *testing.T, storeST bool, stFunc func(ts int64) int64) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
@ -2270,19 +2339,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts)
ok, chunkCreated := s.appendHistogram(storeST, stFunc(998), 998, histograms[0], 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts)
ok, chunkCreated = s.appendHistogram(storeST, stFunc(999), 999, histograms[1], 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts)
ok, chunkCreated = s.appendHistogram(storeST, stFunc(1000), 1000, histograms[2], 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts)
ok, chunkCreated = s.appendHistogram(storeST, stFunc(1001), 1001, histograms[3], 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -2293,7 +2362,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range")
require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range")
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts)
ok, chunkCreated = s.appendHistogram(storeST, stFunc(1002), 1002, histogramWithOneMoreBucket, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
@ -2328,7 +2397,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
var nextTs int64
var totalAppendedSamples int
for i := range samplesPerChunk / 4 {
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts)
require.Truef(t, ok, "slow sample %d was not appended", i)
nextTs += slowRate
totalAppendedSamples++
@ -2337,12 +2406,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
// Suddenly, the rate increases and we receive a sample every millisecond.
for i := range math.MaxUint16 {
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts)
require.Truef(t, ok, "quick sample %d was not appended", i)
nextTs++
totalAppendedSamples++
}
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts)
ok, chunkCreated := s.append(false, 0, DefaultBlockDuration, float64(0), 0, cOpts)
require.True(t, ok, "new chunk sample was not appended")
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
@ -2371,18 +2440,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, cOpts)
ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, cOpts)
ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -2427,18 +2496,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, cOpts)
ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, cOpts)
ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -2775,10 +2844,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
require.True(t, created, "series was not created")
for i := range 7 {
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
ok, chunkCreated := s.append(false, 0, int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
ok, chunkCreated = s.append(false, 0, int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
h.chunkDiskMapper.CutNewFile()
@ -3118,7 +3187,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
ok, _ := s.append(0, 0, 0, cOpts)
ok, _ := s.append(false, 0, 0, 0, 0, cOpts)
require.True(t, ok, "Series append failed.")
require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.")
}
@ -3678,7 +3747,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
for i := range 7 {
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}
@ -5569,7 +5638,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
require.False(t, ok)
require.NotNil(t, ms)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
require.Len(t, chks, 1)
@ -7267,3 +7336,137 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) {
})
}
}
// TestHeadAppender_STStorage_Disabled verifies that when EnableSTStorage is false,
// start timestamps are NOT stored in chunks (AtST returns 0).
func TestHeadAppender_STStorage_Disabled(t *testing.T) {
type sampleData struct {
st int64
ts int64
fSample float64
}
samples := []sampleData{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
{st: 30, ts: 300, fSample: 3.0},
}
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(false) // Explicitly disable ST storage
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
// Use AppenderV2 to append samples with ST values
a := h.AppenderV2(context.Background())
for _, s := range samples {
_, err := a.Append(0, lbls, s.st, s.ts, s.fSample, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, a.Commit())
// Verify ST values are NOT stored (should all be 0)
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
// Read chunks and verify all ST values are 0
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
st := it.AtST()
require.Equal(t, int64(0), st, "ST should be 0 when EnableSTStorage is false")
}
require.NoError(t, it.Err())
}
}
// TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding
// is used based on EnableSTStorage setting.
func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {
samples := []struct {
st int64
ts int64
fSample float64
}{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
}
for _, enableST := range []bool{false, true} {
t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(enableST)
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
a := h.Appender(context.Background())
for _, s := range samples {
_, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st)
require.NoError(t, err)
_, err = a.Append(0, lbls, s.ts, s.fSample)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
// Check chunk encoding
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
require.NotEmpty(t, chkMetas)
// Verify encoding
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
encoding := chk.Encoding()
if enableST {
// Should use ST-capable encoding
require.Equal(t, chunkenc.EncXOROptST, encoding,
"Expected ST-capable encoding when EnableSTStorage is true")
} else {
// Should use regular XOR encoding
require.Equal(t, chunkenc.EncXOR, encoding,
"Expected regular XOR encoding when EnableSTStorage is false")
}
}
})
}
}

View file

@ -115,8 +115,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}()
wg.Add(concurrency)
storeST := h.opts.EnableSTStorage.Load()
for i := range concurrency {
processors[i].setup()
processors[i].setup(storeST)
go func(wp *walSubsetProcessor) {
missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
@ -576,6 +577,7 @@ type walSubsetProcessor struct {
input chan walSubsetProcessorInputItem
output chan []record.RefSample
histogramsOutput chan []histogramRecord
storeST bool
}
type walSubsetProcessorInputItem struct {
@ -586,10 +588,11 @@ type walSubsetProcessorInputItem struct {
deletedSeriesRefs []chunks.HeadSeriesRef
}
func (wp *walSubsetProcessor) setup() {
func (wp *walSubsetProcessor) setup(storeST bool) {
wp.input = make(chan walSubsetProcessorInputItem, 300)
wp.output = make(chan []record.RefSample, 300)
wp.histogramsOutput = make(chan []histogramRecord, 300)
wp.storeST = storeST
}
func (wp *walSubsetProcessor) closeAndDrain() {
@ -666,7 +669,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
h.numStaleSeries.Dec()
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
if _, chunkCreated := ms.append(wp.storeST, s.ST, s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
_ = ms.mmapChunks(h.chunkDiskMapper)
@ -703,14 +706,16 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum)
}
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
// TODO(krajorama,ywwg): Pass ST when available in WBL.
_, chunkCreated = ms.appendHistogram(wp.storeST, 0, s.t, s.h, 0, appendChunkOpts)
} else {
newlyStale = value.IsStaleNaN(s.fh.Sum)
if ms.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum)
}
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
// TODO(krajorama,ywwg): Pass ST when available in WBL.
_, chunkCreated = ms.appendFloatHistogram(wp.storeST, 0, s.t, s.fh, 0, appendChunkOpts)
}
if newlyStale {
h.numStaleSeries.Inc()
@ -779,8 +784,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}()
wg.Add(concurrency)
storeST := h.opts.EnableSTStorage.Load()
for i := range concurrency {
processors[i].setup()
processors[i].setup(storeST)
go func(wp *wblSubsetProcessor) {
missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h)
@ -1025,6 +1031,7 @@ type wblSubsetProcessor struct {
input chan wblSubsetProcessorInputItem
output chan []record.RefSample
histogramsOutput chan []histogramRecord
storeST bool
}
type wblSubsetProcessorInputItem struct {
@ -1033,10 +1040,11 @@ type wblSubsetProcessorInputItem struct {
histogramSamples []histogramRecord
}
func (wp *wblSubsetProcessor) setup() {
func (wp *wblSubsetProcessor) setup(storeST bool) {
wp.output = make(chan []record.RefSample, 300)
wp.histogramsOutput = make(chan []histogramRecord, 300)
wp.input = make(chan wblSubsetProcessorInputItem, 300)
wp.storeST = storeST
}
func (wp *wblSubsetProcessor) closeAndDrain() {
@ -1096,7 +1104,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
missingSeries[s.Ref] = struct{}{}
continue
}
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
ok, chunkCreated, _ := ms.insert(wp.storeST, s.ST, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
@ -1124,9 +1132,11 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
var chunkCreated bool
var ok bool
if s.h != nil {
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
// TODO(krajorama,ywwg): Pass ST when available in WBL.
ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
} else {
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
// TODO(krajorama,ywwg): Pass ST when available in WBL.
ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
}
if chunkCreated {
h.metrics.chunksCreated.Inc()
@ -1400,7 +1410,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
maxExemplarsPerRecord := 10000
batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage}
enc := record.Encoder{EnableSTStorage: h.opts.EnableSTStorage.Load()}
flushExemplars := func() error {
if len(batch) == 0 {
return nil

View file

@ -34,14 +34,13 @@ func NewOOOChunk() *OOOChunk {
// Insert inserts the sample such that order is maintained.
// Returns false if insert was not possible due to the same timestamp already existing.
func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
func (o *OOOChunk) Insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
// Although out-of-order samples can be out-of-order amongst themselves, we
// are opinionated and expect them to be usually in-order meaning we could
// try to append at the end first if the new timestamp is higher than the
// last known timestamp.
if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t {
// TODO(krajorama): pass ST.
o.samples = append(o.samples, sample{0, t, v, h, fh})
o.samples = append(o.samples, sample{st, t, v, h, fh})
return true
}
@ -50,8 +49,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog
if i >= len(o.samples) {
// none found. append it at the end
// TODO(krajorama): pass ST.
o.samples = append(o.samples, sample{0, t, v, h, fh})
o.samples = append(o.samples, sample{st, t, v, h, fh})
return true
}
@ -63,8 +61,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog
// Expand length by 1 to make room. use a zero sample, we will overwrite it anyway.
o.samples = append(o.samples, sample{})
copy(o.samples[i+1:], o.samples[i:])
// TODO(krajorama): pass ST.
o.samples[i] = sample{0, t, v, h, fh}
o.samples[i] = sample{st, t, v, h, fh}
return true
}
@ -76,7 +73,7 @@ func (o *OOOChunk) NumSamples() int {
// ToEncodedChunks returns chunks with the samples in the OOOChunk.
//
//nolint:revive
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) {
func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memChunk, err error) {
if len(o.samples) == 0 {
return nil, nil
}
@ -96,10 +93,13 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
if s.t > maxt {
break
}
encoding := chunkenc.EncXOR
if s.h != nil {
encoding := chunkenc.ValFloat.ChunkEncoding(storeST)
switch {
case s.h != nil:
// TODO(krajorama): use ST capable histogram chunk.
encoding = chunkenc.EncHistogram
} else if s.fh != nil {
case s.fh != nil:
// TODO(krajorama): use ST capable float histogram chunk.
encoding = chunkenc.EncFloatHistogram
}
@ -111,15 +111,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
cmint = s.t
switch encoding {
case chunkenc.EncXOR:
chunk = chunkenc.NewXORChunk()
case chunkenc.EncHistogram:
chunk = chunkenc.NewHistogramChunk()
case chunkenc.EncFloatHistogram:
chunk = chunkenc.NewFloatHistogramChunk()
default:
chunk = chunkenc.NewXORChunk()
chunk, err = chunkenc.NewEmptyChunk(encoding)
if err != nil {
// This should never happen. No point using a default type as
// calling the wrong append function would panic.
return chks, err
}
app, err = chunk.Appender()
if err != nil {
@ -127,18 +123,17 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
}
}
switch encoding {
case chunkenc.EncXOR:
// TODO(krajorama): pass ST.
app.Append(0, s.t, s.f)
case chunkenc.EncXOR, chunkenc.EncXOROptST:
app.Append(s.st, s.t, s.f)
case chunkenc.EncHistogram:
// TODO(krajorama): handle ST capable histogram chunk.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
)
// TODO(krajorama): pass ST.
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false)
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.st, s.t, s.h, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
@ -147,14 +142,14 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chunk = newChunk
}
case chunkenc.EncFloatHistogram:
// TODO(krajorama): handle ST capable float histogram chunk.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
)
// TODO(krajorama): pass ST.
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false)
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.st, s.t, s.fh, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})

View file

@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
*chks = (*chks)[:0]
if s.ooo != nil {
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
return getOOOSeriesChunks(s, oh.head.opts.EnableSTStorage.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
}
*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
return nil
@ -88,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
@ -106,7 +106,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, c.minTime, c.maxTime)
if err != nil {
handleChunkWriteError(err)
return nil
@ -230,6 +230,11 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
return c, it, err
}
// STStorageEnabled returns whether ST storage is enabled in the Head.
func (cr *HeadAndOOOChunkReader) STStorageEnabled() bool {
return cr.head.opts.EnableSTStorage.Load()
}
// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
// behaviour is only implemented for the in-order head chunk.
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
@ -347,7 +352,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
}
var lastMmapRef chunks.ChunkDiskMapperRef
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger)
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.opts.EnableSTStorage.Load(), head.chunkDiskMapper, head.logger)
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
@ -481,7 +486,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
return nil
}
return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableSTStorage.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
}
func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) {

View file

@ -31,10 +31,11 @@ const testMaxSize int = 32
func valEven(pos int) int64 { return int64(pos*2 + 2) } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values
func valOdd(pos int) int64 { return int64(pos*2 + 1) } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals.
func makeEvenSampleSlice(n int, sampleFunc func(ts int64) sample) []sample {
func makeEvenSampleSlice(n int, sampleFunc func(st, ts int64) sample) []sample {
s := make([]sample, n)
for i := range n {
s[i] = sampleFunc(valEven(i))
ts := valEven(i)
s[i] = sampleFunc(ts, ts) // Use ts as st for consistency
}
return s
}
@ -43,23 +44,50 @@ func makeEvenSampleSlice(n int, sampleFunc func(ts int64) sample) []sample {
// - Number of pre-existing samples anywhere from 0 to testMaxSize-1.
// - Insert new sample before first pre-existing samples, after the last, and anywhere in between.
// - With a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves.
// - With st=0 and st!=0 to verify ordering is based on sample.t, not sample.st.
func TestOOOInsert(t *testing.T) {
scenarios := map[string]struct {
sampleFunc func(ts int64) sample
sampleFunc func(st, ts int64) sample
}{
"float": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, f: float64(ts)}
"float st=0": {
sampleFunc: func(st, ts int64) sample {
return sample{st: 0, t: ts, f: float64(ts)}
},
},
"integer histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(ts)}
"float st=ts": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts, t: ts, f: float64(ts)}
},
},
"float histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)}
"float st=ts-100": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts - 100, t: ts, f: float64(ts)}
},
},
"float st descending while t ascending": {
// st values go in opposite direction of t to ensure ordering is by t
sampleFunc: func(st, ts int64) sample {
return sample{st: 1000 - ts, t: ts, f: float64(ts)}
},
},
"integer histogram st=0": {
sampleFunc: func(st, ts int64) sample {
return sample{st: 0, t: ts, h: tsdbutil.GenerateTestHistogram(ts)}
},
},
"integer histogram st=ts": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts, t: ts, h: tsdbutil.GenerateTestHistogram(ts)}
},
},
"float histogram st=0": {
sampleFunc: func(st, ts int64) sample {
return sample{st: 0, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)}
},
},
"float histogram st=ts": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)}
},
},
}
@ -71,7 +99,7 @@ func TestOOOInsert(t *testing.T) {
}
func testOOOInsert(t *testing.T,
sampleFunc func(ts int64) sample,
sampleFunc func(st, ts int64) sample,
) {
for numPreExisting := 0; numPreExisting <= testMaxSize; numPreExisting++ {
// For example, if we have numPreExisting 2, then:
@ -84,19 +112,22 @@ func testOOOInsert(t *testing.T,
chunk := NewOOOChunk()
chunk.samples = make([]sample, numPreExisting)
chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc)
newSample := sampleFunc(valOdd(insertPos))
chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh)
ts := valOdd(insertPos)
newSample := sampleFunc(ts, ts) // Use ts as st for consistency
chunk.Insert(newSample.st, newSample.t, newSample.f, newSample.h, newSample.fh)
var expSamples []sample
// Our expected new samples slice, will be first the original samples.
for i := 0; i < insertPos; i++ {
expSamples = append(expSamples, sampleFunc(valEven(i)))
ts := valEven(i)
expSamples = append(expSamples, sampleFunc(ts, ts))
}
// Then the new sample.
expSamples = append(expSamples, newSample)
// Followed by any original samples that were pushed back by the new one.
for i := insertPos; i < numPreExisting; i++ {
expSamples = append(expSamples, sampleFunc(valEven(i)))
ts := valEven(i)
expSamples = append(expSamples, sampleFunc(ts, ts))
}
require.Equal(t, expSamples, chunk.samples, "numPreExisting %d, insertPos %d", numPreExisting, insertPos)
@ -107,23 +138,50 @@ func testOOOInsert(t *testing.T,
// TestOOOInsertDuplicate tests the correct behavior when inserting a sample that is a duplicate of any
// pre-existing samples, with between 1 and testMaxSize pre-existing samples and
// with a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves.
// With st=0 and st!=0 to verify duplicate detection is based on sample.t, not sample.st.
func TestOOOInsertDuplicate(t *testing.T) {
scenarios := map[string]struct {
sampleFunc func(ts int64) sample
sampleFunc func(st, ts int64) sample
}{
"float": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, f: float64(ts)}
"float st=0": {
sampleFunc: func(st, ts int64) sample {
return sample{st: 0, t: ts, f: float64(ts)}
},
},
"integer histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(ts)}
"float st=ts": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts, t: ts, f: float64(ts)}
},
},
"float histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)}
"float st=ts-100": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts - 100, t: ts, f: float64(ts)}
},
},
"float st descending while t ascending": {
// st values go in opposite direction of t to ensure duplicate detection is by t
sampleFunc: func(st, ts int64) sample {
return sample{st: 1000 - ts, t: ts, f: float64(ts)}
},
},
"integer histogram st=0": {
sampleFunc: func(st, ts int64) sample {
return sample{st: 0, t: ts, h: tsdbutil.GenerateTestHistogram(ts)}
},
},
"integer histogram st=ts": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts, t: ts, h: tsdbutil.GenerateTestHistogram(ts)}
},
},
"float histogram st=0": {
sampleFunc: func(st, ts int64) sample {
return sample{st: 0, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)}
},
},
"float histogram st=ts": {
sampleFunc: func(st, ts int64) sample {
return sample{st: ts, t: ts, fh: tsdbutil.GenerateTestFloatHistogram(ts)}
},
},
}
@ -135,7 +193,7 @@ func TestOOOInsertDuplicate(t *testing.T) {
}
func testOOOInsertDuplicate(t *testing.T,
sampleFunc func(ts int64) sample,
sampleFunc func(st, ts int64) sample,
) {
for num := 1; num <= testMaxSize; num++ {
for dupPos := 0; dupPos < num; dupPos++ {
@ -145,7 +203,7 @@ func testOOOInsertDuplicate(t *testing.T,
dupSample := chunk.samples[dupPos]
dupSample.f = 0.123
ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
ok := chunk.Insert(dupSample.st, dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change.
require.False(t, ok)
@ -252,17 +310,17 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
for _, s := range tc.samples {
switch s.Type() {
case chunkenc.ValFloat:
oooChunk.Insert(s.t, s.f, nil, nil)
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
case chunkenc.ValHistogram:
oooChunk.Insert(s.t, 0, s.h.Copy(), nil)
oooChunk.Insert(s.st, s.t, 0, s.h.Copy(), nil)
case chunkenc.ValFloatHistogram:
oooChunk.Insert(s.t, 0, nil, s.fh.Copy())
oooChunk.Insert(s.st, s.t, 0, nil, s.fh.Copy())
default:
t.Fatalf("unexpected sample type %d", s.Type())
}
}
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
chunks, err := oooChunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
require.Len(t, chunks, len(tc.expectedChunks), "number of chunks")
sampleIndex := 0
@ -308,3 +366,87 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
})
}
}
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with storeST=true and storeST=false for float samples.
// When storeST=true, st values are preserved; when storeST=false, AtST() returns 0.
// TODO(@krajorama): Add histogram test cases once ST storage is implemented for histograms.
func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
testCases := map[string]struct {
samples []sample
}{
"floats with st=0": {
samples: []sample{
{st: 0, t: 1000, f: 43.0},
{st: 0, t: 1100, f: 42.0},
},
},
"floats with st=t": {
samples: []sample{
{st: 1000, t: 1000, f: 43.0},
{st: 1100, t: 1100, f: 42.0},
},
},
"floats with st=t-100": {
samples: []sample{
{st: 900, t: 1000, f: 43.0},
{st: 1000, t: 1100, f: 42.0},
},
},
"floats with varying st": {
samples: []sample{
{st: 500, t: 1000, f: 43.0},
{st: 1100, t: 1100, f: 42.0}, // st == t
{st: 0, t: 1200, f: 41.0}, // st == 0
},
},
}
storageScenarios := []struct {
name string
storeST bool
expectedEncoding chunkenc.Encoding
}{
{"storeST=true", true, chunkenc.EncXOROptST},
{"storeST=false", false, chunkenc.EncXOR},
}
for name, tc := range testCases {
for _, ss := range storageScenarios {
t.Run(name+"/"+ss.name, func(t *testing.T) {
oooChunk := OOOChunk{}
for _, s := range tc.samples {
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
}
chunks, err := oooChunk.ToEncodedChunks(ss.storeST, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
require.Len(t, chunks, 1, "number of chunks")
c := chunks[0]
require.Equal(t, ss.expectedEncoding, c.chunk.Encoding(), "chunk encoding")
require.Equal(t, tc.samples[0].t, c.minTime, "chunk minTime")
require.Equal(t, tc.samples[len(tc.samples)-1].t, c.maxTime, "chunk maxTime")
// Verify samples can be read back with correct st and t values.
it := c.chunk.Iterator(nil)
sampleIndex := 0
for it.Next() == chunkenc.ValFloat {
gotT, gotF := it.At()
gotST := it.AtST()
if ss.storeST {
// When storeST=true, st values should be preserved.
require.Equal(t, tc.samples[sampleIndex].st, gotST, "sample %d st", sampleIndex)
} else {
// When storeST=false, AtST() should return 0.
require.Equal(t, int64(0), gotST, "sample %d st should be 0 when storeST=false", sampleIndex)
}
require.Equal(t, tc.samples[sampleIndex].t, gotT, "sample %d t", sampleIndex)
require.Equal(t, tc.samples[sampleIndex].f, gotF, "sample %d f", sampleIndex)
sampleIndex++
}
require.Equal(t, len(tc.samples), sampleIndex, "number of samples")
})
}
}
}

View file

@ -695,6 +695,15 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
// stStorageEnabled returns whether ST storage is enabled in the ChunkReader.
// Returns false if the ChunkReader doesn't implement ChunkReaderWithSTStorage.
func (p *populateWithDelGenericSeriesIterator) stStorageEnabled() bool {
if cr, ok := p.cr.(ChunkReaderWithSTStorage); ok {
return cr.STStorageEnabled()
}
return false
}
type blockSeriesEntry struct {
chunks ChunkReader
blockID ulid.ULID
@ -885,12 +894,17 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
st, t int64
err error
)
newChunk, err = valueType.NewChunk(p.stStorageEnabled())
if err != nil {
p.err = fmt.Errorf("create new chunk while re-encoding: %w", err)
return false
}
if app, err = newChunk.Appender(); err != nil {
p.err = fmt.Errorf("get chunk appender while re-encoding: %w", err)
return false
}
switch valueType {
case chunkenc.ValHistogram:
newChunk = chunkenc.NewHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
@ -905,10 +919,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
}
}
case chunkenc.ValFloat:
newChunk = chunkenc.NewXORChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloat {
err = fmt.Errorf("found value type %v in float chunk", vt)
@ -920,10 +930,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
app.Append(st, t, v)
}
case chunkenc.ValFloatHistogram:
newChunk = chunkenc.NewFloatHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloatHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
@ -1000,7 +1006,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
}
cmint = p.currDelIter.AtT()
if currentChunk, err = currentValueType.NewChunk(); err != nil {
if currentChunk, err = currentValueType.NewChunk(p.stStorageEnabled()); err != nil {
break
}
if app, err = currentChunk.Appender(); err != nil {