diff --git a/tsdb/chunkenc/benchmark_test.go b/tsdb/chunkenc/benchmark_test.go index c1335d1e44..dc0d66107f 100644 --- a/tsdb/chunkenc/benchmark_test.go +++ b/tsdb/chunkenc/benchmark_test.go @@ -263,7 +263,7 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl for _, f := range []fmtCase{ {name: "XOR", newChunkFn: func() Chunk { return NewXORChunk() }, stUnsupported: true}, - {name: "XOR_OPT_ST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }}, + {name: "XOR_OPT_ST", newChunkFn: func() Chunk { return NewXORSTChunk() }}, } { for _, s := range sampleCases { b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) { diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 6fb8de2a77..246a9aafa8 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -30,7 +30,7 @@ const ( EncXOR EncHistogram EncFloatHistogram - EncXOROptST + EncXORST ) func (e Encoding) String() string { @@ -43,7 +43,7 @@ func (e Encoding) String() string { return "histogram" case EncFloatHistogram: return "floathistogram" - case EncXOROptST: + case EncXORST: return "XOR-start-timestamp" } return "" @@ -51,7 +51,7 @@ func (e Encoding) String() string { // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOROptST + return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXORST } const ( @@ -325,7 +325,7 @@ func NewPool() Pool { }, xoroptst: sync.Pool{ New: func() any { - return &XorOptSTChunk{b: bstream{}} + return &XorSTChunk{b: bstream{}} }, }, } @@ -340,8 +340,8 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { c = p.histogram.Get().(*HistogramChunk) case EncFloatHistogram: c = p.floatHistogram.Get().(*FloatHistogramChunk) - case EncXOROptST: - c = p.xoroptst.Get().(*XorOptSTChunk) + case EncXORST: + c = p.xoroptst.Get().(*XorSTChunk) default: return nil, fmt.Errorf("invalid chunk encoding %q", e) } @@ -363,8 +363,8 @@ func (p *pool) Put(c Chunk) error { case EncFloatHistogram: _, ok = c.(*FloatHistogramChunk) sp = &p.floatHistogram - case EncXOROptST: - _, ok = c.(*XorOptSTChunk) + case EncXORST: + _, ok = c.(*XorSTChunk) sp = &p.xoroptst default: return fmt.Errorf("invalid chunk encoding %q", c.Encoding()) @@ -392,8 +392,8 @@ func FromData(e Encoding, d []byte) (Chunk, error) { return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil case EncFloatHistogram: return &FloatHistogramChunk{b: bstream{count: 0, stream: d}}, nil - case EncXOROptST: - return &XorOptSTChunk{b: bstream{count: 0, stream: d}}, nil + case EncXORST: + return &XorSTChunk{b: bstream{count: 0, stream: d}}, nil } return nil, fmt.Errorf("invalid chunk encoding %q", e) } @@ -407,8 +407,8 @@ func NewEmptyChunk(e Encoding) (Chunk, error) { return NewHistogramChunk(), nil case EncFloatHistogram: return NewFloatHistogramChunk(), nil - case EncXOROptST: - return NewXOROptSTChunk(), nil + case EncXORST: + return NewXORSTChunk(), nil } return nil, fmt.Errorf("invalid chunk encoding %q", e) } diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 1717300288..08825a4b4c 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -34,7 +34,7 @@ func TestChunk(t *testing.T) { factory func() Chunk }{ {encoding: EncXOR, supportsST: false, factory: func() Chunk { return NewXORChunk() }}, - {encoding: EncXOROptST, supportsST: true, factory: func() Chunk { return NewXOROptSTChunk() }}, + {encoding: EncXORST, supportsST: true, factory: func() Chunk { return NewXORSTChunk() }}, } for _, tc := range testcases { t.Run(fmt.Sprintf("%v", tc.encoding), func(t *testing.T) { @@ -144,7 +144,7 @@ func TestPool(t *testing.T) { }, { name: "xor opt st", - encoding: EncXOROptST, + encoding: EncXORST, }, { name: "invalid encoding", @@ -167,8 +167,8 @@ func TestPool(t *testing.T) { b = &c.(*HistogramChunk).b case EncFloatHistogram: b = &c.(*FloatHistogramChunk).b - case EncXOROptST: - b = &c.(*XorOptSTChunk).b + case EncXORST: + b = &c.(*XorSTChunk).b default: b = &c.(*XORChunk).b } diff --git a/tsdb/chunkenc/xoroptst.go b/tsdb/chunkenc/xorst.go similarity index 68% rename from tsdb/chunkenc/xoroptst.go rename to tsdb/chunkenc/xorst.go index dcc8bf6c41..c0f131a935 100644 --- a/tsdb/chunkenc/xoroptst.go +++ b/tsdb/chunkenc/xorst.go @@ -22,13 +22,9 @@ import ( const ( chunkSTHeaderSize = 1 - maxFirstSTChangeOn = 0x7F + maxFirstSTChangeOn = 0xFF ) -func writeHeaderFirstSTKnown(b []byte) { - b[0] = 0x80 -} - func writeHeaderFirstSTChangeOn(b []byte, firstSTChangeOn uint16) { // First bit indicates the initial ST value. // Here we save the sample number from where the first change occurs in the @@ -38,57 +34,46 @@ func writeHeaderFirstSTChangeOn(b []byte, firstSTChangeOn uint16) { // This should never happen, would cause corruption (ST already skipped but shouldn't). return } - b[0] |= uint8(firstSTChangeOn) + b[0] = uint8(firstSTChangeOn) } -func readSTHeader(b []byte) (firstSTKnown bool, firstSTChangeOn uint8) { - if b[0] == 0x00 { - return false, 0 - } - if b[0] == 0x80 { - return true, 0 - } - mask := byte(0x80) - if b[0]&mask != 0 { - firstSTKnown = true - } - mask = 0x7F - return firstSTKnown, b[0] & mask +func readSTHeader(b []byte) uint8 { + return b[0] } -// XorOptSTChunk holds XOR enncoded samples with optional start time (ST) +// XorSTChunk holds XOR enncoded samples with optional start time (ST) // per chunk or per sample. See tsdb/docs/format/chunks.md for details. -type XorOptSTChunk struct { +type XorSTChunk struct { b bstream } -// NewXOROptSTChunk returns a new chunk with XORv2 encoding. -func NewXOROptSTChunk() *XorOptSTChunk { +// NewXORSTChunk returns a new chunk with XORv2 encoding. +func NewXORSTChunk() *XorSTChunk { b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize) - return &XorOptSTChunk{b: bstream{stream: b, count: 0}} + return &XorSTChunk{b: bstream{stream: b, count: 0}} } -func (c *XorOptSTChunk) Reset(stream []byte) { +func (c *XorSTChunk) Reset(stream []byte) { c.b.Reset(stream) } // Encoding returns the encoding type. -func (*XorOptSTChunk) Encoding() Encoding { - return EncXOROptST +func (*XorSTChunk) Encoding() Encoding { + return EncXORST } // Bytes returns the underlying byte slice of the chunk. -func (c *XorOptSTChunk) Bytes() []byte { +func (c *XorSTChunk) Bytes() []byte { return c.b.bytes() } // NumSamples returns the number of samples in the chunk. -func (c *XorOptSTChunk) NumSamples() int { +func (c *XorSTChunk) NumSamples() int { return int(binary.BigEndian.Uint16(c.Bytes())) } // Compact implements the Chunk interface. -func (c *XorOptSTChunk) Compact() { +func (c *XorSTChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { buf := make([]byte, l) copy(buf, c.b.stream) @@ -99,9 +84,9 @@ func (c *XorOptSTChunk) Compact() { // Appender implements the Chunk interface. // It is not valid to call Appender() multiple times concurrently or to use multiple // Appenders on the same chunk. -func (c *XorOptSTChunk) Appender() (Appender, error) { +func (c *XorSTChunk) Appender() (Appender, error) { if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty. - return &xorOptSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil + return &xorSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil } it := c.iterator(nil) @@ -114,7 +99,7 @@ func (c *XorOptSTChunk) Appender() (Appender, error) { return nil, err } - a := &xorOptSTAppender{ + a := &xorSTAppender{ b: &c.b, st: it.st, t: it.t, @@ -125,16 +110,15 @@ func (c *XorOptSTChunk) Appender() (Appender, error) { trailing: it.trailing, numTotal: it.numTotal, - firstSTKnown: it.firstSTKnown, - firstSTChangeOn: uint16(it.firstSTChangeOn), + firstSTChangeOn: it.firstSTChangeOn, } return a, nil } -func (c *XorOptSTChunk) iterator(it Iterator) *xorOptSTtIterator { - xorIter, ok := it.(*xorOptSTtIterator) +func (c *XorSTChunk) iterator(it Iterator) *xorSTIterator { + xorIter, ok := it.(*xorSTIterator) if !ok { - xorIter = &xorOptSTtIterator{} + xorIter = &xorSTIterator{} } xorIter.Reset(c.b.bytes()) @@ -145,41 +129,39 @@ func (c *XorOptSTChunk) iterator(it Iterator) *xorOptSTtIterator { // Iterator() must not be called concurrently with any modifications to the chunk, // but after it returns you can use an Iterator concurrently with an Appender or // other Iterators. -func (c *XorOptSTChunk) Iterator(it Iterator) Iterator { +func (c *XorSTChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } -type xorOptSTAppender struct { +type xorSTAppender struct { b *bstream numTotal uint16 firstSTChangeOn uint16 leading uint8 trailing uint8 - firstSTKnown bool st, t int64 v float64 stDelta int64 tDelta uint64 } -func (a *xorOptSTAppender) writeVDelta(v float64) { +func (a *xorSTAppender) writeVDelta(v float64) { xorWrite(a.b, v, a.v, &a.leading, &a.trailing) } -func (*xorOptSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) { +func (*xorSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) { panic("appended a histogram sample to a float chunk") } -func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) { +func (*xorSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) { panic("appended a float histogram sample to a float chunk") } -type xorOptSTtIterator struct { +type xorSTIterator struct { br bstreamReader numTotal uint16 - firstSTKnown bool - firstSTChangeOn uint8 + firstSTChangeOn uint16 leading uint8 trailing uint8 @@ -193,7 +175,7 @@ type xorOptSTtIterator struct { err error } -func (it *xorOptSTtIterator) Seek(t int64) ValueType { +func (it *xorSTIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone } @@ -206,35 +188,35 @@ func (it *xorOptSTtIterator) Seek(t int64) ValueType { return ValFloat } -func (it *xorOptSTtIterator) At() (int64, float64) { +func (it *xorSTIterator) At() (int64, float64) { return it.t, it.val } -func (*xorOptSTtIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { +func (*xorSTIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic("cannot call xorIterator.AtHistogram") } -func (*xorOptSTtIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { +func (*xorSTIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { panic("cannot call xorIterator.AtFloatHistogram") } -func (it *xorOptSTtIterator) AtT() int64 { +func (it *xorSTIterator) AtT() int64 { return it.t } -func (it *xorOptSTtIterator) AtST() int64 { +func (it *xorSTIterator) AtST() int64 { return it.st } -func (it *xorOptSTtIterator) Err() error { +func (it *xorSTIterator) Err() error { return it.err } -func (it *xorOptSTtIterator) Reset(b []byte) { +func (it *xorSTIterator) Reset(b []byte) { // We skip initial headers for actual samples. it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:]) it.numTotal = binary.BigEndian.Uint16(b) - it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[chunkHeaderSize:]) + it.firstSTChangeOn = uint16(readSTHeader(b[chunkHeaderSize:])) it.numRead = 0 it.st = 0 it.t = 0 @@ -246,74 +228,12 @@ func (it *xorOptSTtIterator) Reset(b []byte) { it.err = nil } -func (a *xorOptSTAppender) Append(st, t int64, v float64) { - if st == 0 && a.numTotal != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown { - // Fast path for no ST usage at all. - // Same as classic XOR chunk appender. - - var tDelta uint64 - - switch a.numTotal { - case 0: - buf := make([]byte, binary.MaxVarintLen64) - for _, b := range buf[:binary.PutVarint(buf, t)] { - a.b.writeByte(b) - } - a.b.writeBits(math.Float64bits(v), 64) - case 1: - buf := make([]byte, binary.MaxVarintLen64) - tDelta = uint64(t - a.t) - for _, b := range buf[:binary.PutUvarint(buf, tDelta)] { - a.b.writeByte(b) - } - a.writeVDelta(v) - default: - tDelta = uint64(t - a.t) - dod := int64(tDelta - a.tDelta) - - // Gorilla has a max resolution of seconds, Prometheus milliseconds. - // Thus we use higher value range steps with larger bit size. - // - // TODO(beorn7): This seems to needlessly jump to large bit - // sizes even for very small deviations from zero. Timestamp - // compression can probably benefit from some smaller bit - // buckets. See also what was done for histogram encoding in - // varbit.go. - switch { - case dod == 0: - a.b.writeBit(zero) - case bitRange(dod, 14): - a.b.writeByte(0b10<<6 | (uint8(dod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod. - a.b.writeByte(uint8(dod)) // Bottom 8 bits of dod. - case bitRange(dod, 17): - a.b.writeBits(0b110, 3) - a.b.writeBits(uint64(dod), 17) - case bitRange(dod, 20): - a.b.writeBits(0b1110, 4) - a.b.writeBits(uint64(dod), 20) - default: - a.b.writeBits(0b1111, 4) - a.b.writeBits(uint64(dod), 64) - } - - a.writeVDelta(v) - } - - a.t = t - a.v = v - a.tDelta = tDelta - a.numTotal++ - binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) - - return - } - +func (a *xorSTAppender) Append(st, t int64, v float64) { var ( stDelta int64 tDelta uint64 ) - // Slow path for ST usage. switch a.numTotal { case 0: buf := make([]byte, binary.MaxVarintLen64) @@ -321,8 +241,6 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) { for _, b := range buf[:binary.PutVarint(buf, st)] { a.b.writeByte(b) } - a.firstSTKnown = true - writeHeaderFirstSTKnown(a.b.bytes()[chunkHeaderSize:]) for _, b := range buf[:binary.PutVarint(buf, t)] { a.b.writeByte(b) @@ -432,25 +350,23 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) { binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) } -func (it *xorOptSTtIterator) retErr(err error) ValueType { +func (it *xorSTIterator) retErr(err error) ValueType { it.err = err return ValNone } -func (it *xorOptSTtIterator) Next() ValueType { +func (it *xorSTIterator) Next() ValueType { if it.err != nil || it.numRead == it.numTotal { return ValNone } if it.numRead == 0 { - // Optional ST read. - if it.firstSTKnown { - st, err := binary.ReadVarint(&it.br) - if err != nil { - return it.retErr(err) - } - it.st = st + // Always read ST for the first sample. + st, err := binary.ReadVarint(&it.br) + if err != nil { + return it.retErr(err) } + it.st = st t, err := binary.ReadVarint(&it.br) if err != nil { return it.retErr(err) @@ -486,8 +402,7 @@ func (it *xorOptSTtIterator) Next() ValueType { return it.readValue() } - if it.firstSTChangeOn > 0 && it.numRead >= uint16(it.firstSTChangeOn) { - // Inlined readClassicVarbitInt(&it.br) + if it.firstSTChangeOn > 0 && it.numRead >= it.firstSTChangeOn { var d byte // read delta-of-delta for range 4 { @@ -606,7 +521,7 @@ func (it *xorOptSTtIterator) Next() ValueType { return it.readValue() } -func (it *xorOptSTtIterator) readValue() ValueType { +func (it *xorSTIterator) readValue() ValueType { err := xorRead(&it.br, &it.val, &it.leading, &it.trailing) if err != nil { return it.retErr(err) diff --git a/tsdb/chunkenc/xoroptst_test.go b/tsdb/chunkenc/xorst_test.go similarity index 81% rename from tsdb/chunkenc/xoroptst_test.go rename to tsdb/chunkenc/xorst_test.go index 15b87993de..aa77b968de 100644 --- a/tsdb/chunkenc/xoroptst_test.go +++ b/tsdb/chunkenc/xorst_test.go @@ -21,7 +21,7 @@ import ( func TestXorOptSTChunk(t *testing.T) { testChunkSTHandling(t, ValFloat, func() Chunk { - return NewXOROptSTChunk() + return NewXORSTChunk() }, ) } @@ -29,7 +29,7 @@ func TestXorOptSTChunk(t *testing.T) { func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) { const afterMax = maxFirstSTChangeOn + 3 t.Run("zero ST", func(t *testing.T) { - chunk := NewXOROptSTChunk() + chunk := NewXORSTChunk() app, err := chunk.Appender() require.NoError(t, err) for i := range afterMax { @@ -51,7 +51,7 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) { }) t.Run("non-zero ST after 127", func(t *testing.T) { - chunk := NewXOROptSTChunk() + chunk := NewXORSTChunk() app, err := chunk.Appender() require.NoError(t, err) for i := range afterMax { @@ -83,26 +83,16 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) { func TestXorOptSTChunk_STHeader(t *testing.T) { b := make([]byte, 1) - writeHeaderFirstSTKnown(b) - firstSTKnown, firstSTChangeOn := readSTHeader(b) - require.True(t, firstSTKnown) - require.Equal(t, uint8(0), firstSTChangeOn) - - b = make([]byte, 1) - firstSTKnown, firstSTChangeOn = readSTHeader(b) - require.False(t, firstSTKnown) + firstSTChangeOn := readSTHeader(b) require.Equal(t, uint8(0), firstSTChangeOn) b = make([]byte, 1) writeHeaderFirstSTChangeOn(b, 1) - firstSTKnown, firstSTChangeOn = readSTHeader(b) - require.False(t, firstSTKnown) + firstSTChangeOn = readSTHeader(b) require.Equal(t, uint8(1), firstSTChangeOn) b = make([]byte, 1) - writeHeaderFirstSTKnown(b) writeHeaderFirstSTChangeOn(b, 119) - firstSTKnown, firstSTChangeOn = readSTHeader(b) - require.True(t, firstSTKnown) + firstSTChangeOn = readSTHeader(b) require.Equal(t, uint8(119), firstSTChangeOn) }