mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-09 00:22:19 -04:00
feat(tsdb): allow appending to ST capable XOR chunk optionally
See PR description for uptodate info on details. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
dc8613df54
commit
e40f988f5c
12 changed files with 220 additions and 128 deletions
|
|
@ -341,11 +341,14 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
|||
i := 0
|
||||
seriesIter := s.Series.Iterator(nil)
|
||||
lastType := chunkenc.ValNone
|
||||
lastHadST := false
|
||||
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
|
||||
if typ != lastType || i >= seriesToChunkEncoderSplit {
|
||||
st := seriesIter.AtST()
|
||||
hasST := st != 0
|
||||
if typ != lastType || lastHadST != hasST || i >= seriesToChunkEncoderSplit {
|
||||
// Create a new chunk if the sample type changed or too many samples in the current one.
|
||||
chks = appendChunk(chks, mint, maxt, chk)
|
||||
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
|
||||
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(), hasST)
|
||||
if err != nil {
|
||||
return errChunksIterator{err: err}
|
||||
}
|
||||
|
|
@ -358,21 +361,20 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
|||
i = 0
|
||||
}
|
||||
lastType = typ
|
||||
lastHadST = hasST
|
||||
|
||||
var (
|
||||
st, t int64
|
||||
v float64
|
||||
h *histogram.Histogram
|
||||
fh *histogram.FloatHistogram
|
||||
t int64
|
||||
v float64
|
||||
h *histogram.Histogram
|
||||
fh *histogram.FloatHistogram
|
||||
)
|
||||
switch typ {
|
||||
case chunkenc.ValFloat:
|
||||
t, v = seriesIter.At()
|
||||
st = seriesIter.AtST()
|
||||
app.Append(st, t, v)
|
||||
case chunkenc.ValHistogram:
|
||||
t, h = seriesIter.AtHistogram(nil)
|
||||
st = seriesIter.AtST()
|
||||
newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false)
|
||||
if err != nil {
|
||||
return errChunksIterator{err: err}
|
||||
|
|
@ -388,7 +390,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
|||
}
|
||||
case chunkenc.ValFloatHistogram:
|
||||
t, fh = seriesIter.AtFloatHistogram(nil)
|
||||
st = seriesIter.AtST()
|
||||
newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false)
|
||||
if err != nil {
|
||||
return errChunksIterator{err: err}
|
||||
|
|
|
|||
|
|
@ -76,6 +76,8 @@ type Chunk interface {
|
|||
Bytes() []byte
|
||||
|
||||
// Encoding returns the encoding type of the chunk.
|
||||
// If the chunk is capable of storing ST (start timestamps), it should
|
||||
// return the appropriate encoding type (e.g., EncXOROptST).
|
||||
Encoding() Encoding
|
||||
|
||||
// Appender returns an appender to append samples to the chunk.
|
||||
|
|
@ -202,13 +204,34 @@ func (v ValueType) ChunkEncoding() Encoding {
|
|||
}
|
||||
}
|
||||
|
||||
func (v ValueType) NewChunk() (Chunk, error) {
|
||||
func (v ValueType) ChunkEncodingWithST(st int64) Encoding {
|
||||
switch v {
|
||||
case ValFloat:
|
||||
if st != 0 {
|
||||
return EncXOROptST
|
||||
}
|
||||
return EncXOR
|
||||
case ValHistogram:
|
||||
return EncHistogram
|
||||
case ValFloatHistogram:
|
||||
return EncFloatHistogram
|
||||
default:
|
||||
return EncNone
|
||||
}
|
||||
}
|
||||
|
||||
func (v ValueType) NewChunk(storeST bool) (Chunk, error) {
|
||||
switch v {
|
||||
case ValFloat:
|
||||
if storeST {
|
||||
return NewXOROptSTChunk(), nil
|
||||
}
|
||||
return NewXORChunk(), nil
|
||||
case ValHistogram:
|
||||
// TODO(krajorama): return a ST capable histogram chunk when they are supported.
|
||||
return NewHistogramChunk(), nil
|
||||
case ValFloatHistogram:
|
||||
// TODO(krajorama): return a ST capable float histogram chunk when they are supported.
|
||||
return NewFloatHistogramChunk(), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("value type %v unsupported", v)
|
||||
|
|
@ -399,9 +422,12 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
|
|||
}
|
||||
|
||||
// NewEmptyChunk returns an empty chunk for the given encoding.
|
||||
func NewEmptyChunk(e Encoding) (Chunk, error) {
|
||||
func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) {
|
||||
switch e {
|
||||
case EncXOR:
|
||||
if storeST {
|
||||
return NewXOROptSTChunk(), nil
|
||||
}
|
||||
return NewXORChunk(), nil
|
||||
case EncHistogram:
|
||||
return NewHistogramChunk(), nil
|
||||
|
|
|
|||
|
|
@ -135,7 +135,9 @@ type Meta struct {
|
|||
}
|
||||
|
||||
// ChunkFromSamples requires all samples to have the same type.
|
||||
// TODO(krajorama): test with ST when chunk formats support it.
|
||||
// It is not efficient and meant for testing purposes only.
|
||||
// It scans the samples to determine whether any sample has ST set and
|
||||
// creates a chunk accordingly.
|
||||
func ChunkFromSamples(s []Sample) (Meta, error) {
|
||||
return ChunkFromSamplesGeneric(SampleSlice(s))
|
||||
}
|
||||
|
|
@ -154,7 +156,17 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
|
|||
}
|
||||
|
||||
sampleType := s.Get(0).Type()
|
||||
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding())
|
||||
|
||||
hasST := false
|
||||
for i := range s.Len() {
|
||||
if s.Get(i).ST() != 0 {
|
||||
hasST = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Request storing ST in the chunk if available.
|
||||
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(), hasST)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
)
|
||||
|
||||
|
|
@ -58,3 +59,35 @@ func TestWriterWithDefaultSegmentSize(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Len(t, d, 1, "expected only one segment to be created to hold both chunks")
|
||||
}
|
||||
|
||||
func TestChunkFromSamplesWithST(t *testing.T) {
|
||||
// Create samples with explicit ST (source timestamp) values
|
||||
samples := []Sample{
|
||||
sample{t: 10, f: 11, st: 5},
|
||||
sample{t: 20, f: 12, st: 15},
|
||||
sample{t: 30, f: 13, st: 25},
|
||||
}
|
||||
|
||||
chk, err := ChunkFromSamples(samples)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, chk.Chunk)
|
||||
|
||||
// Verify MinTime and MaxTime
|
||||
require.Equal(t, int64(10), chk.MinTime)
|
||||
require.Equal(t, int64(30), chk.MaxTime)
|
||||
|
||||
// Iterate over the chunk and verify ST values are preserved
|
||||
it := chk.Chunk.Iterator(nil)
|
||||
idx := 0
|
||||
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
|
||||
require.Equal(t, chunkenc.ValFloat, vt)
|
||||
ts, v := it.At()
|
||||
st := it.AtST()
|
||||
require.Equal(t, samples[idx].ST(), st, "ST mismatch at index %d", idx)
|
||||
require.Equal(t, samples[idx].T(), ts, "T mismatch at index %d", idx)
|
||||
require.Equal(t, samples[idx].F(), v, "F mismatch at index %d", idx)
|
||||
idx++
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
require.Equal(t, len(samples), idx, "expected all samples to be iterated")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -185,6 +185,7 @@ func (h *Head) appender() *headAppender {
|
|||
typesInBatch: h.getTypeMap(),
|
||||
appendID: appendID,
|
||||
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
||||
storeST: h.opts.EnableSTStorage.Load(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -412,6 +413,7 @@ type headAppenderBase struct {
|
|||
|
||||
appendID, cleanupAppendIDsBelow uint64
|
||||
closed bool
|
||||
storeST bool
|
||||
}
|
||||
type headAppender struct {
|
||||
headAppenderBase
|
||||
|
|
@ -1387,7 +1389,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
|
|||
// Sample is OOO and OOO handling is enabled
|
||||
// and the delta is within the OOO tolerance.
|
||||
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WAL.
|
||||
ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
|
||||
if chunkCreated {
|
||||
r, ok := acc.oooMmapMarkers[series.ref]
|
||||
if !ok || r != nil {
|
||||
|
|
@ -1431,7 +1434,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
|
|||
default:
|
||||
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
|
||||
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
|
||||
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
|
||||
// TODO(krajorama,ywwg): pass ST when available in WAL.
|
||||
ok, chunkCreated = series.append(a.storeST, 0, s.T, s.V, a.appendID, acc.appendChunkOpts)
|
||||
if ok {
|
||||
if s.T < acc.inOrderMint {
|
||||
acc.inOrderMint = s.T
|
||||
|
|
@ -1492,7 +1496,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
|
|||
// Sample is OOO and OOO handling is enabled
|
||||
// and the delta is within the OOO tolerance.
|
||||
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WAL.
|
||||
ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
|
||||
if chunkCreated {
|
||||
r, ok := acc.oooMmapMarkers[series.ref]
|
||||
if !ok || r != nil {
|
||||
|
|
@ -1540,7 +1545,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
|
|||
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
|
||||
}
|
||||
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
|
||||
// TODO(krajorama,ywwg): pass ST when available in WAL.
|
||||
ok, chunkCreated = series.appendHistogram(a.storeST, 0, s.T, s.H, a.appendID, acc.appendChunkOpts)
|
||||
if ok {
|
||||
if s.T < acc.inOrderMint {
|
||||
acc.inOrderMint = s.T
|
||||
|
|
@ -1601,7 +1607,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
|
|||
// Sample is OOO and OOO handling is enabled
|
||||
// and the delta is within the OOO tolerance.
|
||||
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WAL.
|
||||
ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
|
||||
if chunkCreated {
|
||||
r, ok := acc.oooMmapMarkers[series.ref]
|
||||
if !ok || r != nil {
|
||||
|
|
@ -1649,7 +1656,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
|
|||
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
|
||||
}
|
||||
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
|
||||
// TODO(krajorama,ywwg): pass ST when available in WAL.
|
||||
ok, chunkCreated = series.appendFloatHistogram(a.storeST, 0, s.T, s.FH, a.appendID, acc.appendChunkOpts)
|
||||
if ok {
|
||||
if s.T < acc.inOrderMint {
|
||||
acc.inOrderMint = s.T
|
||||
|
|
@ -1799,18 +1807,18 @@ func (a *headAppenderBase) Commit() (err error) {
|
|||
}
|
||||
|
||||
// insert is like append, except it inserts. Used for OOO samples.
|
||||
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
|
||||
func (s *memSeries) insert(storeST bool, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
|
||||
if s.ooo == nil {
|
||||
s.ooo = &memSeriesOOOFields{}
|
||||
}
|
||||
c := s.ooo.oooHeadChunk
|
||||
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
|
||||
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
|
||||
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger)
|
||||
c, mmapRefs = s.cutNewOOOHeadChunk(storeST, t, chunkDiskMapper, logger)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
ok := c.chunk.Insert(t, v, h, fh)
|
||||
ok := c.chunk.Insert(st, t, v, h, fh)
|
||||
if ok {
|
||||
if chunkCreated || t < c.minTime {
|
||||
c.minTime = t
|
||||
|
|
@ -1833,13 +1841,12 @@ type chunkOpts struct {
|
|||
// the appendID for isolation. (The appendID can be zero, which results in no
|
||||
// isolation for this append.)
|
||||
// Series lock must be held when calling.
|
||||
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
|
||||
func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.EncXOR, o)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
// TODO(krajorama): pass ST.
|
||||
s.app.Append(0, t, v)
|
||||
s.app.Append(st, t, v)
|
||||
|
||||
c.maxTime = t
|
||||
|
||||
|
|
@ -1859,14 +1866,14 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
|
|||
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
|
||||
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
|
||||
// consistent, we return chunkCreated=false in this case.
|
||||
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards and mmap used up chunks.
|
||||
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
|
||||
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o)
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.EncHistogram, o)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
|
|
@ -1881,8 +1888,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
prevApp = nil
|
||||
}
|
||||
|
||||
// TODO(krajorama): pass ST.
|
||||
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed
|
||||
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed
|
||||
|
||||
s.lastHistogramValue = h
|
||||
s.lastFloatHistogramValue = nil
|
||||
|
|
@ -1917,14 +1923,14 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
|
||||
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
|
||||
// consistent, we return chunkCreated=false in this case.
|
||||
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards and mmap used up chunks.
|
||||
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
|
||||
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o)
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.EncFloatHistogram, o)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
|
|
@ -1939,8 +1945,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
|||
prevApp = nil
|
||||
}
|
||||
|
||||
// TODO(krajorama): pass ST.
|
||||
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed.
|
||||
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed.
|
||||
|
||||
s.lastHistogramValue = nil
|
||||
s.lastFloatHistogramValue = fh
|
||||
|
|
@ -1974,7 +1979,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
|||
// number of samples they contain with a soft cap in bytes.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// This should be called only when appending data.
|
||||
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
// We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut
|
||||
// a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a
|
||||
// maximally-sized sample (19 bytes).
|
||||
|
|
@ -1988,7 +1993,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
return c, false, false
|
||||
}
|
||||
// There is no head chunk in this series yet, create the first chunk for the sample.
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
|
@ -1999,14 +2004,14 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
|
||||
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
|
||||
if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk {
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
if c.chunk.Encoding() != e {
|
||||
// The chunk encoding expected by this append is different than the head chunk's
|
||||
// encoding. So we cut a new chunk with the expected encoding.
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
|
@ -2031,7 +2036,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
// as we expect more chunks to come.
|
||||
// Note that next chunk will have its nextAt recalculated for the new rate.
|
||||
if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 {
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
|
@ -2042,7 +2047,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
// cut based on their size in bytes.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// This should be called only when appending data.
|
||||
func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
c = s.headChunks
|
||||
|
||||
if c == nil {
|
||||
|
|
@ -2051,7 +2056,7 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o
|
|||
return c, false, false
|
||||
}
|
||||
// There is no head chunk in this series yet, create the first chunk for the sample.
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
|
@ -2063,7 +2068,7 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o
|
|||
if c.chunk.Encoding() != e {
|
||||
// The chunk encoding expected by this append is different than the head chunk's
|
||||
// encoding. So we cut a new chunk with the expected encoding.
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
|
@ -2104,7 +2109,7 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o
|
|||
// increased or if the bucket/span count has increased.
|
||||
// Note that next chunk will have its nextAt recalculated for the new rate.
|
||||
if (t >= s.nextAt || numBytes >= targetBytes*2) && (numSamples >= chunkenc.MinSamplesPerHistogramChunk || t >= nextChunkRangeStart) {
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
|
@ -2129,7 +2134,7 @@ func computeChunkEndTime(start, cur, maxT int64, ratioToFull float64) int64 {
|
|||
return int64(float64(start) + float64(maxT-start)/math.Floor(n))
|
||||
}
|
||||
|
||||
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk {
|
||||
func (s *memSeries) cutNewHeadChunk(storeST bool, mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk {
|
||||
// When cutting a new head chunk we create a new memChunk instance with .prev
|
||||
// pointing at the current .headChunks, so it forms a linked list.
|
||||
// All but first headChunks list elements will be m-mapped as soon as possible
|
||||
|
|
@ -2142,12 +2147,16 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
|
|||
|
||||
if chunkenc.IsValidEncoding(e) {
|
||||
var err error
|
||||
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e)
|
||||
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e, storeST)
|
||||
if err != nil {
|
||||
panic(err) // This should never happen.
|
||||
}
|
||||
} else {
|
||||
s.headChunks.chunk = chunkenc.NewXORChunk()
|
||||
var err error
|
||||
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(chunkenc.EncXOR, storeST)
|
||||
if err != nil {
|
||||
panic(err) // This should never happen.
|
||||
}
|
||||
}
|
||||
|
||||
// Set upper bound on when the next chunk must be started. An earlier timestamp
|
||||
|
|
@ -2164,8 +2173,8 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
|
|||
|
||||
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
|
||||
// The caller must ensure that s is locked and s.ooo is not nil.
|
||||
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
|
||||
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger)
|
||||
func (s *memSeries) cutNewOOOHeadChunk(storeST bool, mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
|
||||
ref := s.mmapCurrentOOOHeadChunk(storeST, chunkDiskMapper, logger)
|
||||
|
||||
s.ooo.oooHeadChunk = &oooHeadChunk{
|
||||
chunk: NewOOOChunk(),
|
||||
|
|
@ -2177,12 +2186,12 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
|
|||
}
|
||||
|
||||
// s must be locked when calling.
|
||||
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef {
|
||||
func (s *memSeries) mmapCurrentOOOHeadChunk(storeST bool, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef {
|
||||
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
|
||||
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
|
||||
return nil
|
||||
}
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, math.MinInt64, math.MaxInt64)
|
||||
if err != nil {
|
||||
handleChunkWriteError(err)
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -2993,7 +2993,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
|
|||
require.False(t, ok)
|
||||
require.NotNil(t, ms)
|
||||
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, chks, 1)
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ func TestMemSeries_chunk(t *testing.T) {
|
|||
|
||||
appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) {
|
||||
for i := start; i < end; i += chunkStep {
|
||||
ok, _ := s.append(i, float64(i), 0, chunkOpts{
|
||||
ok, _ := s.append(false, 0, i, float64(i), 0, chunkOpts{
|
||||
chunkDiskMapper: cdm,
|
||||
chunkRange: chunkRange,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
|
|
|
|||
|
|
@ -1492,7 +1492,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
|||
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
for i := 0; i < 4000; i += 5 {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
|
|
@ -1642,7 +1642,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
|||
if tc.mmappedChunks > 0 {
|
||||
headStart = (tc.mmappedChunks + 1) * chunkRange
|
||||
for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep {
|
||||
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
|
||||
ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
series.mmapChunks(chunkDiskMapper)
|
||||
|
|
@ -1652,7 +1652,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
|||
series.headChunks = nil
|
||||
} else {
|
||||
for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep {
|
||||
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
|
||||
ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed: %d", i)
|
||||
}
|
||||
}
|
||||
|
|
@ -2202,20 +2202,20 @@ func TestMemSeries_append(t *testing.T) {
|
|||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
// New chunk must correctly be cut at 1000.
|
||||
ok, chunkCreated := s.append(998, 1, 0, cOpts)
|
||||
ok, chunkCreated := s.append(false, 0, 998, 1, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "first sample created chunk")
|
||||
|
||||
ok, chunkCreated = s.append(999, 2, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 999, 2, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
|
||||
ok, chunkCreated = s.append(1000, 3, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 1000, 3, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "expected new chunk on boundary")
|
||||
|
||||
ok, chunkCreated = s.append(1001, 4, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 1001, 4, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
|
|
@ -2229,7 +2229,7 @@ func TestMemSeries_append(t *testing.T) {
|
|||
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
|
||||
// at approximately 120 samples per chunk.
|
||||
for i := 1; i < 1000; i++ {
|
||||
ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts)
|
||||
ok, _ := s.append(false, 0, 1001+int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
}
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
|
|
@ -2270,19 +2270,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
|||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
// New chunk must correctly be cut at 1000.
|
||||
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts)
|
||||
ok, chunkCreated := s.appendHistogram(false, 0, 998, histograms[0], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "first sample created chunk")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts)
|
||||
ok, chunkCreated = s.appendHistogram(false, 0, 999, histograms[1], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts)
|
||||
ok, chunkCreated = s.appendHistogram(false, 0, 1000, histograms[2], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "expected new chunk on boundary")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts)
|
||||
ok, chunkCreated = s.appendHistogram(false, 0, 1001, histograms[3], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
|
|
@ -2293,7 +2293,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
|||
require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts)
|
||||
ok, chunkCreated = s.appendHistogram(false, 0, 1002, histogramWithOneMoreBucket, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
|
||||
|
||||
|
|
@ -2328,7 +2328,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
|||
var nextTs int64
|
||||
var totalAppendedSamples int
|
||||
for i := range samplesPerChunk / 4 {
|
||||
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
|
||||
ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts)
|
||||
require.Truef(t, ok, "slow sample %d was not appended", i)
|
||||
nextTs += slowRate
|
||||
totalAppendedSamples++
|
||||
|
|
@ -2337,12 +2337,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
|||
|
||||
// Suddenly, the rate increases and we receive a sample every millisecond.
|
||||
for i := range math.MaxUint16 {
|
||||
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
|
||||
ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts)
|
||||
require.Truef(t, ok, "quick sample %d was not appended", i)
|
||||
nextTs++
|
||||
totalAppendedSamples++
|
||||
}
|
||||
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts)
|
||||
ok, chunkCreated := s.append(false, 0, DefaultBlockDuration, float64(0), 0, cOpts)
|
||||
require.True(t, ok, "new chunk sample was not appended")
|
||||
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
|
||||
|
||||
|
|
@ -2371,18 +2371,18 @@ func TestGCChunkAccess(t *testing.T) {
|
|||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
|
||||
// Appending 2 samples for the first chunk.
|
||||
ok, chunkCreated := s.append(0, 0, 0, cOpts)
|
||||
ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(999, 999, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
// A new chunks should be created here as it's beyond the chunk range.
|
||||
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
|
|
@ -2427,18 +2427,18 @@ func TestGCSeriesAccess(t *testing.T) {
|
|||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
|
||||
// Appending 2 samples for the first chunk.
|
||||
ok, chunkCreated := s.append(0, 0, 0, cOpts)
|
||||
ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(999, 999, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
// A new chunks should be created here as it's beyond the chunk range.
|
||||
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
|
|
@ -2775,10 +2775,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
|||
require.True(t, created, "series was not created")
|
||||
|
||||
for i := range 7 {
|
||||
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
|
||||
ok, chunkCreated := s.append(false, 0, int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunk was not created")
|
||||
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
|
||||
ok, chunkCreated = s.append(false, 0, int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunk was created")
|
||||
h.chunkDiskMapper.CutNewFile()
|
||||
|
|
@ -3118,7 +3118,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
|||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
|
||||
ok, _ := s.append(0, 0, 0, cOpts)
|
||||
ok, _ := s.append(false, 0, 0, 0, 0, cOpts)
|
||||
require.True(t, ok, "Series append failed.")
|
||||
require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.")
|
||||
}
|
||||
|
|
@ -3678,7 +3678,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
|||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
for i := range 7 {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
|
||||
|
|
@ -5569,7 +5569,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
|
|||
require.False(t, ok)
|
||||
require.NotNil(t, ms)
|
||||
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, chks, 1)
|
||||
|
||||
|
|
|
|||
|
|
@ -115,8 +115,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}()
|
||||
|
||||
wg.Add(concurrency)
|
||||
storeST := h.opts.EnableSTStorage.Load()
|
||||
for i := range concurrency {
|
||||
processors[i].setup()
|
||||
processors[i].setup(storeST)
|
||||
|
||||
go func(wp *walSubsetProcessor) {
|
||||
missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
||||
|
|
@ -576,6 +577,7 @@ type walSubsetProcessor struct {
|
|||
input chan walSubsetProcessorInputItem
|
||||
output chan []record.RefSample
|
||||
histogramsOutput chan []histogramRecord
|
||||
storeST bool
|
||||
}
|
||||
|
||||
type walSubsetProcessorInputItem struct {
|
||||
|
|
@ -586,10 +588,11 @@ type walSubsetProcessorInputItem struct {
|
|||
deletedSeriesRefs []chunks.HeadSeriesRef
|
||||
}
|
||||
|
||||
func (wp *walSubsetProcessor) setup() {
|
||||
func (wp *walSubsetProcessor) setup(storeST bool) {
|
||||
wp.input = make(chan walSubsetProcessorInputItem, 300)
|
||||
wp.output = make(chan []record.RefSample, 300)
|
||||
wp.histogramsOutput = make(chan []histogramRecord, 300)
|
||||
wp.storeST = storeST
|
||||
}
|
||||
|
||||
func (wp *walSubsetProcessor) closeAndDrain() {
|
||||
|
|
@ -666,7 +669,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
|||
h.numStaleSeries.Dec()
|
||||
}
|
||||
|
||||
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
if _, chunkCreated := ms.append(wp.storeST, 0, s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
||||
h.metrics.chunksCreated.Inc()
|
||||
h.metrics.chunks.Inc()
|
||||
_ = ms.mmapChunks(h.chunkDiskMapper)
|
||||
|
|
@ -703,14 +707,16 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
|||
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum)
|
||||
}
|
||||
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
_, chunkCreated = ms.appendHistogram(wp.storeST, 0, s.t, s.h, 0, appendChunkOpts)
|
||||
} else {
|
||||
newlyStale = value.IsStaleNaN(s.fh.Sum)
|
||||
if ms.lastFloatHistogramValue != nil {
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum)
|
||||
}
|
||||
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
_, chunkCreated = ms.appendFloatHistogram(wp.storeST, 0, s.t, s.fh, 0, appendChunkOpts)
|
||||
}
|
||||
if newlyStale {
|
||||
h.numStaleSeries.Inc()
|
||||
|
|
@ -779,8 +785,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}()
|
||||
|
||||
wg.Add(concurrency)
|
||||
storeST := h.opts.EnableSTStorage.Load()
|
||||
for i := range concurrency {
|
||||
processors[i].setup()
|
||||
processors[i].setup(storeST)
|
||||
|
||||
go func(wp *wblSubsetProcessor) {
|
||||
missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h)
|
||||
|
|
@ -1025,6 +1032,7 @@ type wblSubsetProcessor struct {
|
|||
input chan wblSubsetProcessorInputItem
|
||||
output chan []record.RefSample
|
||||
histogramsOutput chan []histogramRecord
|
||||
storeST bool
|
||||
}
|
||||
|
||||
type wblSubsetProcessorInputItem struct {
|
||||
|
|
@ -1033,10 +1041,11 @@ type wblSubsetProcessorInputItem struct {
|
|||
histogramSamples []histogramRecord
|
||||
}
|
||||
|
||||
func (wp *wblSubsetProcessor) setup() {
|
||||
func (wp *wblSubsetProcessor) setup(storeST bool) {
|
||||
wp.output = make(chan []record.RefSample, 300)
|
||||
wp.histogramsOutput = make(chan []histogramRecord, 300)
|
||||
wp.input = make(chan wblSubsetProcessorInputItem, 300)
|
||||
wp.storeST = storeST
|
||||
}
|
||||
|
||||
func (wp *wblSubsetProcessor) closeAndDrain() {
|
||||
|
|
@ -1096,7 +1105,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
|
|||
missingSeries[s.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
ok, chunkCreated, _ := ms.insert(wp.storeST, 0, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
if chunkCreated {
|
||||
h.metrics.chunksCreated.Inc()
|
||||
h.metrics.chunks.Inc()
|
||||
|
|
@ -1124,9 +1134,11 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
|
|||
var chunkCreated bool
|
||||
var ok bool
|
||||
if s.h != nil {
|
||||
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
} else {
|
||||
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
}
|
||||
if chunkCreated {
|
||||
h.metrics.chunksCreated.Inc()
|
||||
|
|
|
|||
|
|
@ -34,14 +34,13 @@ func NewOOOChunk() *OOOChunk {
|
|||
|
||||
// Insert inserts the sample such that order is maintained.
|
||||
// Returns false if insert was not possible due to the same timestamp already existing.
|
||||
func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
|
||||
func (o *OOOChunk) Insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
|
||||
// Although out-of-order samples can be out-of-order amongst themselves, we
|
||||
// are opinionated and expect them to be usually in-order meaning we could
|
||||
// try to append at the end first if the new timestamp is higher than the
|
||||
// last known timestamp.
|
||||
if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t {
|
||||
// TODO(krajorama): pass ST.
|
||||
o.samples = append(o.samples, sample{0, t, v, h, fh})
|
||||
o.samples = append(o.samples, sample{st, t, v, h, fh})
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -50,8 +49,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog
|
|||
|
||||
if i >= len(o.samples) {
|
||||
// none found. append it at the end
|
||||
// TODO(krajorama): pass ST.
|
||||
o.samples = append(o.samples, sample{0, t, v, h, fh})
|
||||
o.samples = append(o.samples, sample{st, t, v, h, fh})
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -63,8 +61,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog
|
|||
// Expand length by 1 to make room. use a zero sample, we will overwrite it anyway.
|
||||
o.samples = append(o.samples, sample{})
|
||||
copy(o.samples[i+1:], o.samples[i:])
|
||||
// TODO(krajorama): pass ST.
|
||||
o.samples[i] = sample{0, t, v, h, fh}
|
||||
o.samples[i] = sample{st, t, v, h, fh}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
@ -76,7 +73,7 @@ func (o *OOOChunk) NumSamples() int {
|
|||
// ToEncodedChunks returns chunks with the samples in the OOOChunk.
|
||||
//
|
||||
//nolint:revive
|
||||
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) {
|
||||
func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memChunk, err error) {
|
||||
if len(o.samples) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
@ -97,10 +94,17 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
|||
break
|
||||
}
|
||||
encoding := chunkenc.EncXOR
|
||||
if s.h != nil {
|
||||
switch {
|
||||
case s.h != nil:
|
||||
// TODO(krajorama): use ST capable histogram chunk.
|
||||
encoding = chunkenc.EncHistogram
|
||||
} else if s.fh != nil {
|
||||
case s.fh != nil:
|
||||
// TODO(krajorama): use ST capable float histogram chunk.
|
||||
encoding = chunkenc.EncFloatHistogram
|
||||
default:
|
||||
if storeST {
|
||||
encoding = chunkenc.EncXOROptST
|
||||
}
|
||||
}
|
||||
|
||||
// prevApp is the appender for the previous sample.
|
||||
|
|
@ -111,15 +115,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
|||
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||
}
|
||||
cmint = s.t
|
||||
switch encoding {
|
||||
case chunkenc.EncXOR:
|
||||
chunk = chunkenc.NewXORChunk()
|
||||
case chunkenc.EncHistogram:
|
||||
chunk = chunkenc.NewHistogramChunk()
|
||||
case chunkenc.EncFloatHistogram:
|
||||
chunk = chunkenc.NewFloatHistogramChunk()
|
||||
default:
|
||||
chunk = chunkenc.NewXORChunk()
|
||||
chunk, err = chunkenc.NewEmptyChunk(encoding, storeST)
|
||||
if err != nil {
|
||||
// This should never happen. No point using a default type as
|
||||
// calling the wrong append function would panic.
|
||||
return chks, err
|
||||
}
|
||||
app, err = chunk.Appender()
|
||||
if err != nil {
|
||||
|
|
@ -127,18 +127,17 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
|||
}
|
||||
}
|
||||
switch encoding {
|
||||
case chunkenc.EncXOR:
|
||||
// TODO(krajorama): pass ST.
|
||||
app.Append(0, s.t, s.f)
|
||||
case chunkenc.EncXOR, chunkenc.EncXOROptST:
|
||||
app.Append(s.st, s.t, s.f)
|
||||
case chunkenc.EncHistogram:
|
||||
// TODO(krajorama): handle ST capable histogram chunk.
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
|
||||
var (
|
||||
newChunk chunkenc.Chunk
|
||||
recoded bool
|
||||
)
|
||||
// TODO(krajorama): pass ST.
|
||||
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false)
|
||||
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.st, s.t, s.h, false)
|
||||
if newChunk != nil { // A new chunk was allocated.
|
||||
if !recoded {
|
||||
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||
|
|
@ -147,14 +146,14 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
|||
chunk = newChunk
|
||||
}
|
||||
case chunkenc.EncFloatHistogram:
|
||||
// TODO(krajorama): handle ST capable float histogram chunk.
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
|
||||
var (
|
||||
newChunk chunkenc.Chunk
|
||||
recoded bool
|
||||
)
|
||||
// TODO(krajorama): pass ST.
|
||||
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false)
|
||||
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.st, s.t, s.fh, false)
|
||||
if newChunk != nil { // A new chunk was allocated.
|
||||
if !recoded {
|
||||
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
|
|||
*chks = (*chks)[:0]
|
||||
|
||||
if s.ooo != nil {
|
||||
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
|
||||
return getOOOSeriesChunks(s, oh.head.opts.EnableSTStorage.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
|
||||
}
|
||||
*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
|
||||
return nil
|
||||
|
|
@ -88,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
|
|||
//
|
||||
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
|
||||
// the oooHeadChunk will not be considered.
|
||||
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
|
||||
func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
|
||||
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
|
||||
|
||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||
|
|
@ -106,7 +106,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
|
||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
||||
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, c.minTime, c.maxTime)
|
||||
if err != nil {
|
||||
handleChunkWriteError(err)
|
||||
return nil
|
||||
|
|
@ -347,7 +347,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
|
|||
}
|
||||
|
||||
var lastMmapRef chunks.ChunkDiskMapperRef
|
||||
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger)
|
||||
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.opts.EnableSTStorage.Load(), head.chunkDiskMapper, head.logger)
|
||||
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
|
||||
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
||||
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
|
||||
|
|
@ -481,7 +481,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
|
|||
return nil
|
||||
}
|
||||
|
||||
return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
|
||||
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableSTStorage.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
|
||||
}
|
||||
|
||||
func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) {
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ func testOOOInsert(t *testing.T,
|
|||
chunk.samples = make([]sample, numPreExisting)
|
||||
chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc)
|
||||
newSample := sampleFunc(valOdd(insertPos))
|
||||
chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh)
|
||||
chunk.Insert(newSample.st, newSample.t, newSample.f, newSample.h, newSample.fh)
|
||||
|
||||
var expSamples []sample
|
||||
// Our expected new samples slice, will be first the original samples.
|
||||
|
|
@ -145,7 +145,7 @@ func testOOOInsertDuplicate(t *testing.T,
|
|||
dupSample := chunk.samples[dupPos]
|
||||
dupSample.f = 0.123
|
||||
|
||||
ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
|
||||
ok := chunk.Insert(dupSample.st, dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
|
||||
|
||||
expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change.
|
||||
require.False(t, ok)
|
||||
|
|
@ -252,17 +252,17 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
|
|||
for _, s := range tc.samples {
|
||||
switch s.Type() {
|
||||
case chunkenc.ValFloat:
|
||||
oooChunk.Insert(s.t, s.f, nil, nil)
|
||||
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
|
||||
case chunkenc.ValHistogram:
|
||||
oooChunk.Insert(s.t, 0, s.h.Copy(), nil)
|
||||
oooChunk.Insert(s.st, s.t, 0, s.h.Copy(), nil)
|
||||
case chunkenc.ValFloatHistogram:
|
||||
oooChunk.Insert(s.t, 0, nil, s.fh.Copy())
|
||||
oooChunk.Insert(s.st, s.t, 0, nil, s.fh.Copy())
|
||||
default:
|
||||
t.Fatalf("unexpected sample type %d", s.Type())
|
||||
}
|
||||
}
|
||||
|
||||
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||
chunks, err := oooChunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, chunks, len(tc.expectedChunks), "number of chunks")
|
||||
sampleIndex := 0
|
||||
|
|
|
|||
Loading…
Reference in a new issue