Change EncXOROptST to EncXORST where ST is mandatory

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-02-01 09:35:06 +01:00
parent 9fc9033ad8
commit 4be3eaad85
No known key found for this signature in database
GPG key ID: 47A8F9CE80FD7C7F
5 changed files with 71 additions and 166 deletions

View file

@ -263,7 +263,7 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
for _, f := range []fmtCase{
{name: "XOR", newChunkFn: func() Chunk { return NewXORChunk() }, stUnsupported: true},
{name: "XOR_OPT_ST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }},
{name: "XOR_OPT_ST", newChunkFn: func() Chunk { return NewXORSTChunk() }},
} {
for _, s := range sampleCases {
b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) {

View file

@ -30,7 +30,7 @@ const (
EncXOR
EncHistogram
EncFloatHistogram
EncXOROptST
EncXORST
)
func (e Encoding) String() string {
@ -43,7 +43,7 @@ func (e Encoding) String() string {
return "histogram"
case EncFloatHistogram:
return "floathistogram"
case EncXOROptST:
case EncXORST:
return "XOR-start-timestamp"
}
return "<unknown>"
@ -51,7 +51,7 @@ func (e Encoding) String() string {
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOROptST
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXORST
}
const (
@ -325,7 +325,7 @@ func NewPool() Pool {
},
xoroptst: sync.Pool{
New: func() any {
return &XorOptSTChunk{b: bstream{}}
return &XorSTChunk{b: bstream{}}
},
},
}
@ -340,8 +340,8 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
c = p.histogram.Get().(*HistogramChunk)
case EncFloatHistogram:
c = p.floatHistogram.Get().(*FloatHistogramChunk)
case EncXOROptST:
c = p.xoroptst.Get().(*XorOptSTChunk)
case EncXORST:
c = p.xoroptst.Get().(*XorSTChunk)
default:
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
@ -363,8 +363,8 @@ func (p *pool) Put(c Chunk) error {
case EncFloatHistogram:
_, ok = c.(*FloatHistogramChunk)
sp = &p.floatHistogram
case EncXOROptST:
_, ok = c.(*XorOptSTChunk)
case EncXORST:
_, ok = c.(*XorSTChunk)
sp = &p.xoroptst
default:
return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
@ -392,8 +392,8 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil
case EncFloatHistogram:
return &FloatHistogramChunk{b: bstream{count: 0, stream: d}}, nil
case EncXOROptST:
return &XorOptSTChunk{b: bstream{count: 0, stream: d}}, nil
case EncXORST:
return &XorSTChunk{b: bstream{count: 0, stream: d}}, nil
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
@ -407,8 +407,8 @@ func NewEmptyChunk(e Encoding) (Chunk, error) {
return NewHistogramChunk(), nil
case EncFloatHistogram:
return NewFloatHistogramChunk(), nil
case EncXOROptST:
return NewXOROptSTChunk(), nil
case EncXORST:
return NewXORSTChunk(), nil
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}

View file

@ -34,7 +34,7 @@ func TestChunk(t *testing.T) {
factory func() Chunk
}{
{encoding: EncXOR, supportsST: false, factory: func() Chunk { return NewXORChunk() }},
{encoding: EncXOROptST, supportsST: true, factory: func() Chunk { return NewXOROptSTChunk() }},
{encoding: EncXORST, supportsST: true, factory: func() Chunk { return NewXORSTChunk() }},
}
for _, tc := range testcases {
t.Run(fmt.Sprintf("%v", tc.encoding), func(t *testing.T) {
@ -144,7 +144,7 @@ func TestPool(t *testing.T) {
},
{
name: "xor opt st",
encoding: EncXOROptST,
encoding: EncXORST,
},
{
name: "invalid encoding",
@ -167,8 +167,8 @@ func TestPool(t *testing.T) {
b = &c.(*HistogramChunk).b
case EncFloatHistogram:
b = &c.(*FloatHistogramChunk).b
case EncXOROptST:
b = &c.(*XorOptSTChunk).b
case EncXORST:
b = &c.(*XorSTChunk).b
default:
b = &c.(*XORChunk).b
}

View file

@ -22,13 +22,9 @@ import (
const (
chunkSTHeaderSize = 1
maxFirstSTChangeOn = 0x7F
maxFirstSTChangeOn = 0xFF
)
func writeHeaderFirstSTKnown(b []byte) {
b[0] = 0x80
}
func writeHeaderFirstSTChangeOn(b []byte, firstSTChangeOn uint16) {
// First bit indicates the initial ST value.
// Here we save the sample number from where the first change occurs in the
@ -38,57 +34,46 @@ func writeHeaderFirstSTChangeOn(b []byte, firstSTChangeOn uint16) {
// This should never happen, would cause corruption (ST already skipped but shouldn't).
return
}
b[0] |= uint8(firstSTChangeOn)
b[0] = uint8(firstSTChangeOn)
}
func readSTHeader(b []byte) (firstSTKnown bool, firstSTChangeOn uint8) {
if b[0] == 0x00 {
return false, 0
}
if b[0] == 0x80 {
return true, 0
}
mask := byte(0x80)
if b[0]&mask != 0 {
firstSTKnown = true
}
mask = 0x7F
return firstSTKnown, b[0] & mask
func readSTHeader(b []byte) uint8 {
return b[0]
}
// XorOptSTChunk holds XOR enncoded samples with optional start time (ST)
// XorSTChunk holds XOR enncoded samples with optional start time (ST)
// per chunk or per sample. See tsdb/docs/format/chunks.md for details.
type XorOptSTChunk struct {
type XorSTChunk struct {
b bstream
}
// NewXOROptSTChunk returns a new chunk with XORv2 encoding.
func NewXOROptSTChunk() *XorOptSTChunk {
// NewXORSTChunk returns a new chunk with XORv2 encoding.
func NewXORSTChunk() *XorSTChunk {
b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize)
return &XorOptSTChunk{b: bstream{stream: b, count: 0}}
return &XorSTChunk{b: bstream{stream: b, count: 0}}
}
func (c *XorOptSTChunk) Reset(stream []byte) {
func (c *XorSTChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (*XorOptSTChunk) Encoding() Encoding {
return EncXOROptST
func (*XorSTChunk) Encoding() Encoding {
return EncXORST
}
// Bytes returns the underlying byte slice of the chunk.
func (c *XorOptSTChunk) Bytes() []byte {
func (c *XorSTChunk) Bytes() []byte {
return c.b.bytes()
}
// NumSamples returns the number of samples in the chunk.
func (c *XorOptSTChunk) NumSamples() int {
func (c *XorSTChunk) NumSamples() int {
return int(binary.BigEndian.Uint16(c.Bytes()))
}
// Compact implements the Chunk interface.
func (c *XorOptSTChunk) Compact() {
func (c *XorSTChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
buf := make([]byte, l)
copy(buf, c.b.stream)
@ -99,9 +84,9 @@ func (c *XorOptSTChunk) Compact() {
// Appender implements the Chunk interface.
// It is not valid to call Appender() multiple times concurrently or to use multiple
// Appenders on the same chunk.
func (c *XorOptSTChunk) Appender() (Appender, error) {
func (c *XorSTChunk) Appender() (Appender, error) {
if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty.
return &xorOptSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil
return &xorSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil
}
it := c.iterator(nil)
@ -114,7 +99,7 @@ func (c *XorOptSTChunk) Appender() (Appender, error) {
return nil, err
}
a := &xorOptSTAppender{
a := &xorSTAppender{
b: &c.b,
st: it.st,
t: it.t,
@ -125,16 +110,15 @@ func (c *XorOptSTChunk) Appender() (Appender, error) {
trailing: it.trailing,
numTotal: it.numTotal,
firstSTKnown: it.firstSTKnown,
firstSTChangeOn: uint16(it.firstSTChangeOn),
firstSTChangeOn: it.firstSTChangeOn,
}
return a, nil
}
func (c *XorOptSTChunk) iterator(it Iterator) *xorOptSTtIterator {
xorIter, ok := it.(*xorOptSTtIterator)
func (c *XorSTChunk) iterator(it Iterator) *xorSTIterator {
xorIter, ok := it.(*xorSTIterator)
if !ok {
xorIter = &xorOptSTtIterator{}
xorIter = &xorSTIterator{}
}
xorIter.Reset(c.b.bytes())
@ -145,41 +129,39 @@ func (c *XorOptSTChunk) iterator(it Iterator) *xorOptSTtIterator {
// Iterator() must not be called concurrently with any modifications to the chunk,
// but after it returns you can use an Iterator concurrently with an Appender or
// other Iterators.
func (c *XorOptSTChunk) Iterator(it Iterator) Iterator {
func (c *XorSTChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
type xorOptSTAppender struct {
type xorSTAppender struct {
b *bstream
numTotal uint16
firstSTChangeOn uint16
leading uint8
trailing uint8
firstSTKnown bool
st, t int64
v float64
stDelta int64
tDelta uint64
}
func (a *xorOptSTAppender) writeVDelta(v float64) {
func (a *xorSTAppender) writeVDelta(v float64) {
xorWrite(a.b, v, a.v, &a.leading, &a.trailing)
}
func (*xorOptSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
func (*xorSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float chunk")
}
func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
func (*xorSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a float chunk")
}
type xorOptSTtIterator struct {
type xorSTIterator struct {
br bstreamReader
numTotal uint16
firstSTKnown bool
firstSTChangeOn uint8
firstSTChangeOn uint16
leading uint8
trailing uint8
@ -193,7 +175,7 @@ type xorOptSTtIterator struct {
err error
}
func (it *xorOptSTtIterator) Seek(t int64) ValueType {
func (it *xorSTIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone
}
@ -206,35 +188,35 @@ func (it *xorOptSTtIterator) Seek(t int64) ValueType {
return ValFloat
}
func (it *xorOptSTtIterator) At() (int64, float64) {
func (it *xorSTIterator) At() (int64, float64) {
return it.t, it.val
}
func (*xorOptSTtIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
func (*xorSTIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
panic("cannot call xorIterator.AtHistogram")
}
func (*xorOptSTtIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
func (*xorSTIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("cannot call xorIterator.AtFloatHistogram")
}
func (it *xorOptSTtIterator) AtT() int64 {
func (it *xorSTIterator) AtT() int64 {
return it.t
}
func (it *xorOptSTtIterator) AtST() int64 {
func (it *xorSTIterator) AtST() int64 {
return it.st
}
func (it *xorOptSTtIterator) Err() error {
func (it *xorSTIterator) Err() error {
return it.err
}
func (it *xorOptSTtIterator) Reset(b []byte) {
func (it *xorSTIterator) Reset(b []byte) {
// We skip initial headers for actual samples.
it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:])
it.numTotal = binary.BigEndian.Uint16(b)
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[chunkHeaderSize:])
it.firstSTChangeOn = uint16(readSTHeader(b[chunkHeaderSize:]))
it.numRead = 0
it.st = 0
it.t = 0
@ -246,74 +228,12 @@ func (it *xorOptSTtIterator) Reset(b []byte) {
it.err = nil
}
func (a *xorOptSTAppender) Append(st, t int64, v float64) {
if st == 0 && a.numTotal != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown {
// Fast path for no ST usage at all.
// Same as classic XOR chunk appender.
var tDelta uint64
switch a.numTotal {
case 0:
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, t)] {
a.b.writeByte(b)
}
a.b.writeBits(math.Float64bits(v), 64)
case 1:
buf := make([]byte, binary.MaxVarintLen64)
tDelta = uint64(t - a.t)
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
a.b.writeByte(b)
}
a.writeVDelta(v)
default:
tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta)
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
// Thus we use higher value range steps with larger bit size.
//
// TODO(beorn7): This seems to needlessly jump to large bit
// sizes even for very small deviations from zero. Timestamp
// compression can probably benefit from some smaller bit
// buckets. See also what was done for histogram encoding in
// varbit.go.
switch {
case dod == 0:
a.b.writeBit(zero)
case bitRange(dod, 14):
a.b.writeByte(0b10<<6 | (uint8(dod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod.
a.b.writeByte(uint8(dod)) // Bottom 8 bits of dod.
case bitRange(dod, 17):
a.b.writeBits(0b110, 3)
a.b.writeBits(uint64(dod), 17)
case bitRange(dod, 20):
a.b.writeBits(0b1110, 4)
a.b.writeBits(uint64(dod), 20)
default:
a.b.writeBits(0b1111, 4)
a.b.writeBits(uint64(dod), 64)
}
a.writeVDelta(v)
}
a.t = t
a.v = v
a.tDelta = tDelta
a.numTotal++
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
return
}
func (a *xorSTAppender) Append(st, t int64, v float64) {
var (
stDelta int64
tDelta uint64
)
// Slow path for ST usage.
switch a.numTotal {
case 0:
buf := make([]byte, binary.MaxVarintLen64)
@ -321,8 +241,6 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) {
for _, b := range buf[:binary.PutVarint(buf, st)] {
a.b.writeByte(b)
}
a.firstSTKnown = true
writeHeaderFirstSTKnown(a.b.bytes()[chunkHeaderSize:])
for _, b := range buf[:binary.PutVarint(buf, t)] {
a.b.writeByte(b)
@ -432,25 +350,23 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) {
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
}
func (it *xorOptSTtIterator) retErr(err error) ValueType {
func (it *xorSTIterator) retErr(err error) ValueType {
it.err = err
return ValNone
}
func (it *xorOptSTtIterator) Next() ValueType {
func (it *xorSTIterator) Next() ValueType {
if it.err != nil || it.numRead == it.numTotal {
return ValNone
}
if it.numRead == 0 {
// Optional ST read.
if it.firstSTKnown {
st, err := binary.ReadVarint(&it.br)
if err != nil {
return it.retErr(err)
}
it.st = st
// Always read ST for the first sample.
st, err := binary.ReadVarint(&it.br)
if err != nil {
return it.retErr(err)
}
it.st = st
t, err := binary.ReadVarint(&it.br)
if err != nil {
return it.retErr(err)
@ -486,8 +402,7 @@ func (it *xorOptSTtIterator) Next() ValueType {
return it.readValue()
}
if it.firstSTChangeOn > 0 && it.numRead >= uint16(it.firstSTChangeOn) {
// Inlined readClassicVarbitInt(&it.br)
if it.firstSTChangeOn > 0 && it.numRead >= it.firstSTChangeOn {
var d byte
// read delta-of-delta
for range 4 {
@ -606,7 +521,7 @@ func (it *xorOptSTtIterator) Next() ValueType {
return it.readValue()
}
func (it *xorOptSTtIterator) readValue() ValueType {
func (it *xorSTIterator) readValue() ValueType {
err := xorRead(&it.br, &it.val, &it.leading, &it.trailing)
if err != nil {
return it.retErr(err)

View file

@ -21,7 +21,7 @@ import (
func TestXorOptSTChunk(t *testing.T) {
testChunkSTHandling(t, ValFloat, func() Chunk {
return NewXOROptSTChunk()
return NewXORSTChunk()
},
)
}
@ -29,7 +29,7 @@ func TestXorOptSTChunk(t *testing.T) {
func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) {
const afterMax = maxFirstSTChangeOn + 3
t.Run("zero ST", func(t *testing.T) {
chunk := NewXOROptSTChunk()
chunk := NewXORSTChunk()
app, err := chunk.Appender()
require.NoError(t, err)
for i := range afterMax {
@ -51,7 +51,7 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) {
})
t.Run("non-zero ST after 127", func(t *testing.T) {
chunk := NewXOROptSTChunk()
chunk := NewXORSTChunk()
app, err := chunk.Appender()
require.NoError(t, err)
for i := range afterMax {
@ -83,26 +83,16 @@ func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) {
func TestXorOptSTChunk_STHeader(t *testing.T) {
b := make([]byte, 1)
writeHeaderFirstSTKnown(b)
firstSTKnown, firstSTChangeOn := readSTHeader(b)
require.True(t, firstSTKnown)
require.Equal(t, uint8(0), firstSTChangeOn)
b = make([]byte, 1)
firstSTKnown, firstSTChangeOn = readSTHeader(b)
require.False(t, firstSTKnown)
firstSTChangeOn := readSTHeader(b)
require.Equal(t, uint8(0), firstSTChangeOn)
b = make([]byte, 1)
writeHeaderFirstSTChangeOn(b, 1)
firstSTKnown, firstSTChangeOn = readSTHeader(b)
require.False(t, firstSTKnown)
firstSTChangeOn = readSTHeader(b)
require.Equal(t, uint8(1), firstSTChangeOn)
b = make([]byte, 1)
writeHeaderFirstSTKnown(b)
writeHeaderFirstSTChangeOn(b, 119)
firstSTKnown, firstSTChangeOn = readSTHeader(b)
require.True(t, firstSTKnown)
firstSTChangeOn = readSTHeader(b)
require.Equal(t, uint8(119), firstSTChangeOn)
}