mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-10 17:10:12 -04:00
Refactor to use generic appenders
Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
parent
09c8a1d7d8
commit
fdebcdb40f
7 changed files with 54 additions and 123 deletions
|
|
@ -170,8 +170,6 @@ func (c *FloatHistogramChunk) Iterator(it Iterator) Iterator {
|
|||
type FloatHistogramAppender struct {
|
||||
b *bstream
|
||||
|
||||
headerLayout histogramHeaderLayout
|
||||
|
||||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
|
|
@ -183,32 +181,16 @@ type FloatHistogramAppender struct {
|
|||
pBuckets, nBuckets []xorValue
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) counterResetHeaderPos() int {
|
||||
if a.headerLayout == histogramHeaderST {
|
||||
return 0
|
||||
}
|
||||
return histogramFlagPos
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) sampleCountMask() uint16 {
|
||||
if a.headerLayout == histogramHeaderST {
|
||||
return 0x3FFF
|
||||
}
|
||||
return 0xFFFF
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
||||
return CounterResetHeader(a.b.bytes()[a.counterResetHeaderPos()] & CounterResetHeaderMask)
|
||||
return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
|
||||
b := a.b.bytes()
|
||||
pos := a.counterResetHeaderPos()
|
||||
b[pos] = (b[pos] &^ CounterResetHeaderMask) | (byte(cr) & CounterResetHeaderMask)
|
||||
a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()) & a.sampleCountMask())
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()))
|
||||
}
|
||||
|
||||
// setNumSamples writes the float histogram sample count into the chunk header.
|
||||
|
|
|
|||
|
|
@ -79,12 +79,11 @@ func (c *FloatHistogramSTChunk) Appender() (Appender, error) {
|
|||
if len(c.b.stream) == histogramHeaderSize {
|
||||
return &FloatHistogramSTAppender{
|
||||
FloatHistogramAppender: FloatHistogramAppender{
|
||||
b: &c.b,
|
||||
headerLayout: histogramHeaderST,
|
||||
t: math.MinInt64,
|
||||
sum: xorValue{leading: 0xff},
|
||||
cnt: xorValue{leading: 0xff},
|
||||
zCnt: xorValue{leading: 0xff},
|
||||
b: &c.b,
|
||||
t: math.MinInt64,
|
||||
sum: xorValue{leading: 0xff},
|
||||
cnt: xorValue{leading: 0xff},
|
||||
zCnt: xorValue{leading: 0xff},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
|
@ -119,8 +118,7 @@ func (c *FloatHistogramSTChunk) Appender() (Appender, error) {
|
|||
|
||||
a := &FloatHistogramSTAppender{
|
||||
FloatHistogramAppender: FloatHistogramAppender{
|
||||
b: &c.b,
|
||||
headerLayout: histogramHeaderST,
|
||||
b: &c.b,
|
||||
|
||||
schema: it.schema,
|
||||
zThreshold: it.zThreshold,
|
||||
|
|
@ -189,6 +187,12 @@ func (a *FloatHistogramSTAppender) setCounterResetHeader(cr CounterResetHeader)
|
|||
b[0] = (b[0] &^ CounterResetHeaderMask) | (byte(cr) & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
// NumSamples returns the number of samples in the chunk. Since the counter-reset header
|
||||
// is in the top 2 bits of the sample count word, so samples count occupies only the low 14 bits.
|
||||
func (a *FloatHistogramSTAppender) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()) & histogramSTSampleCountMask)
|
||||
}
|
||||
|
||||
func (a *FloatHistogramSTAppender) appendable(h *histogram.FloatHistogram) (
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
|
|
@ -403,15 +407,18 @@ func (*FloatHistogramSTAppender) Append(int64, int64, float64) {
|
|||
|
||||
// AppendHistogram implements Appender. This implementation panics because integer
|
||||
// histogram samples must never be appended to a float histogram chunk.
|
||||
func (*FloatHistogramSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
|
||||
func (*FloatHistogramSTAppender) AppendHistogram(Appender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
|
||||
panic("appended a histogram sample to a float histogram chunk")
|
||||
}
|
||||
|
||||
// AppendFloatHistogram implements Appender for FloatHistogramSTAppender.
|
||||
func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev *FloatHistogramAppender, st, t int64, fh *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
||||
func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int64, fh *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
||||
numSamples := a.NumSamples()
|
||||
|
||||
if numSamples == int(a.sampleCountMask()) {
|
||||
// ST chunks store the sample count in the low 14 bits (the high 2 are the
|
||||
// counter-reset header), so the capacity is histogramSTSampleCountMask rather
|
||||
// than math.MaxUint16.
|
||||
if numSamples == histogramSTSampleCountMask {
|
||||
panic("chunk capacity exceeded")
|
||||
}
|
||||
|
||||
|
|
@ -426,11 +433,13 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev *FloatHistogramAppe
|
|||
case fh.CounterResetHint == histogram.CounterReset:
|
||||
a.setCounterResetHeader(CounterReset)
|
||||
case prev != nil:
|
||||
_, _, _, _, _, counterReset := prev.appendable(fh)
|
||||
if counterReset {
|
||||
a.setCounterResetHeader(CounterReset)
|
||||
} else {
|
||||
a.setCounterResetHeader(NotCounterReset)
|
||||
if p, ok := prev.(floatHistogramAppendable); ok {
|
||||
_, _, _, _, _, counterReset := p.appendable(fh)
|
||||
if counterReset {
|
||||
a.setCounterResetHeader(CounterReset)
|
||||
} else {
|
||||
a.setCounterResetHeader(NotCounterReset)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, false, a, nil
|
||||
|
|
|
|||
|
|
@ -548,36 +548,12 @@ func TestFloatHistogramSTChunk_CounterResetHeader(t *testing.T) {
|
|||
for _, cr := range []CounterResetHeader{UnknownCounterReset, CounterReset, NotCounterReset, GaugeType} {
|
||||
happ.setCounterResetHeader(cr)
|
||||
require.Equal(t, cr, c.GetCounterResetHeader())
|
||||
require.Equal(t, cr, happ.FloatHistogramAppender.GetCounterResetHeader())
|
||||
require.Equal(t, 1, c.NumSamples(), "NumSamples must not include CR bits")
|
||||
require.Equal(t, 1, happ.NumSamples(), "NumSamples must not include CR bits")
|
||||
require.Equal(t, byte2Before, c.Bytes()[2], "setting CR must not disturb byte 2 (ST header)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFloatHistogramSTAppenderPreviousEmbeddedAppenderUsesSTHeader(t *testing.T) {
|
||||
prevChunk := NewFloatHistogramSTChunk()
|
||||
prevApp, err := prevChunk.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
fh1 := tsdbutil.GenerateTestFloatHistogram(10)
|
||||
_, _, prevApp, err = prevApp.AppendFloatHistogram(nil, 100, 1000, fh1, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
prevSTApp := prevApp.(*FloatHistogramSTAppender)
|
||||
prevSTApp.setCounterResetHeader(NotCounterReset)
|
||||
prevChunk.Bytes()[2] = byte(GaugeType)
|
||||
|
||||
nextChunk := NewFloatHistogramSTChunk()
|
||||
nextApp, err := nextChunk.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
fh2 := tsdbutil.GenerateTestFloatHistogram(0)
|
||||
_, _, _, err = nextApp.AppendFloatHistogram(&prevSTApp.FloatHistogramAppender, 200, 2000, fh2, false)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, CounterReset, nextChunk.GetCounterResetHeader())
|
||||
}
|
||||
|
||||
func TestFloatHistogramSTChunkOverFlowPanics(t *testing.T) {
|
||||
testChunkOverFlowPanics(t, EncFloatHistogramST, ValFloatHistogram)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -178,18 +178,10 @@ func (c *HistogramChunk) Iterator(it Iterator) Iterator {
|
|||
return c.iterator(it)
|
||||
}
|
||||
|
||||
type histogramHeaderLayout uint8
|
||||
|
||||
const (
|
||||
histogramHeaderST histogramHeaderLayout = 1
|
||||
)
|
||||
|
||||
// HistogramAppender is an Appender implementation for sparse histograms.
|
||||
type HistogramAppender struct {
|
||||
b *bstream
|
||||
|
||||
headerLayout histogramHeaderLayout
|
||||
|
||||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
|
|
@ -213,32 +205,16 @@ type HistogramAppender struct {
|
|||
trailing uint8
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) counterResetHeaderPos() int {
|
||||
if a.headerLayout == histogramHeaderST {
|
||||
return 0
|
||||
}
|
||||
return histogramFlagPos
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) sampleCountMask() uint16 {
|
||||
if a.headerLayout == histogramHeaderST {
|
||||
return 0x3FFF
|
||||
}
|
||||
return 0xFFFF
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
||||
return CounterResetHeader(a.b.bytes()[a.counterResetHeaderPos()] & CounterResetHeaderMask)
|
||||
return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
|
||||
b := a.b.bytes()
|
||||
pos := a.counterResetHeaderPos()
|
||||
b[pos] = (b[pos] &^ CounterResetHeaderMask) | (byte(cr) & CounterResetHeaderMask)
|
||||
a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()) & a.sampleCountMask())
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()))
|
||||
}
|
||||
|
||||
// setNumSamples writes the histogram sample count into the chunk header.
|
||||
|
|
|
|||
|
|
@ -78,10 +78,9 @@ func (c *HistogramSTChunk) Appender() (Appender, error) {
|
|||
if len(c.b.stream) == histogramHeaderSize {
|
||||
return &HistogramSTAppender{
|
||||
HistogramAppender: HistogramAppender{
|
||||
b: &c.b,
|
||||
headerLayout: histogramHeaderST,
|
||||
t: math.MinInt64,
|
||||
leading: 0xff,
|
||||
b: &c.b,
|
||||
t: math.MinInt64,
|
||||
leading: 0xff,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
|
@ -100,8 +99,7 @@ func (c *HistogramSTChunk) Appender() (Appender, error) {
|
|||
|
||||
a := &HistogramSTAppender{
|
||||
HistogramAppender: HistogramAppender{
|
||||
b: &c.b,
|
||||
headerLayout: histogramHeaderST,
|
||||
b: &c.b,
|
||||
|
||||
schema: it.schema,
|
||||
zThreshold: it.zThreshold,
|
||||
|
|
@ -177,6 +175,12 @@ func (a *HistogramSTAppender) setCounterResetHeader(cr CounterResetHeader) {
|
|||
b[0] = (b[0] &^ CounterResetHeaderMask) | (byte(cr) & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
// NumSamples returns the number of samples in the chunk. Since the counter-reset header
|
||||
// is in the top 2 bits of the sample count word, so samples count occupies only the low 14 bits.
|
||||
func (a *HistogramSTAppender) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()) & histogramSTSampleCountMask)
|
||||
}
|
||||
|
||||
func (a *HistogramSTAppender) appendable(h *histogram.Histogram) (
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
|
|
@ -406,15 +410,18 @@ func (*HistogramSTAppender) Append(int64, int64, float64) {
|
|||
panic("appended a float sample to a histogram chunk")
|
||||
}
|
||||
|
||||
func (*HistogramSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
|
||||
func (*HistogramSTAppender) AppendFloatHistogram(Appender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
|
||||
panic("appended a float histogram sample to a histogram chunk")
|
||||
}
|
||||
|
||||
// AppendHistogram implements Appender for HistogramSTAppender.
|
||||
func (a *HistogramSTAppender) AppendHistogram(prev *HistogramAppender, st, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
||||
func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
||||
numSamples := a.NumSamples()
|
||||
|
||||
if numSamples == int(a.sampleCountMask()) {
|
||||
// ST chunks store the sample count in the low 14 bits (the high 2 are the
|
||||
// counter-reset header), so the capacity is histogramSTSampleCountMask rather
|
||||
// than math.MaxUint16.
|
||||
if numSamples == histogramSTSampleCountMask {
|
||||
panic("chunk capacity exceeded")
|
||||
}
|
||||
|
||||
|
|
@ -429,8 +436,10 @@ func (a *HistogramSTAppender) AppendHistogram(prev *HistogramAppender, st, t int
|
|||
case h.CounterResetHint == histogram.CounterReset:
|
||||
a.setCounterResetHeader(CounterReset)
|
||||
case prev != nil:
|
||||
_, _, _, _, _, counterReset := prev.appendable(h)
|
||||
a.setCounterResetHeader(counterReset)
|
||||
if p, ok := prev.(histogramAppendable); ok {
|
||||
_, _, _, _, _, counterReset := p.appendable(h)
|
||||
a.setCounterResetHeader(counterReset)
|
||||
}
|
||||
}
|
||||
return nil, false, a, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -545,36 +545,12 @@ func TestHistogramSTChunk_CounterResetHeader(t *testing.T) {
|
|||
for _, cr := range []CounterResetHeader{UnknownCounterReset, CounterReset, NotCounterReset, GaugeType} {
|
||||
happ.setCounterResetHeader(cr)
|
||||
require.Equal(t, cr, c.GetCounterResetHeader())
|
||||
require.Equal(t, cr, happ.HistogramAppender.GetCounterResetHeader())
|
||||
require.Equal(t, 1, c.NumSamples())
|
||||
require.Equal(t, 1, happ.NumSamples())
|
||||
require.Equal(t, byte2Before, c.Bytes()[2])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistogramSTAppenderPreviousEmbeddedAppenderUsesSTHeader(t *testing.T) {
|
||||
prevChunk := NewHistogramSTChunk()
|
||||
prevApp, err := prevChunk.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
h1 := tsdbutil.GenerateTestHistogram(10)
|
||||
_, _, prevApp, err = prevApp.AppendHistogram(nil, 100, 1000, h1, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
prevSTApp := prevApp.(*HistogramSTAppender)
|
||||
prevSTApp.setCounterResetHeader(NotCounterReset)
|
||||
prevChunk.Bytes()[2] = byte(GaugeType)
|
||||
|
||||
nextChunk := NewHistogramSTChunk()
|
||||
nextApp, err := nextChunk.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
h2 := tsdbutil.GenerateTestHistogram(0)
|
||||
_, _, _, err = nextApp.AppendHistogram(&prevSTApp.HistogramAppender, 200, 2000, h2, false)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, CounterReset, nextChunk.GetCounterResetHeader())
|
||||
}
|
||||
|
||||
func TestHistogramSTChunkOverFlowPanics(t *testing.T) {
|
||||
testChunkOverFlowPanics(t, EncHistogramST, ValHistogram)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@ import "encoding/binary"
|
|||
|
||||
const (
|
||||
maxFirstSTChangeOn = 0x7F
|
||||
// histogramSTSampleCountMask masks the low 14 bits of the histogram ST chunk's sample
|
||||
// count — the high 2 bits hold the counter-reset header.
|
||||
histogramSTSampleCountMask = 0x3FFF
|
||||
)
|
||||
|
||||
type stEncoder struct {
|
||||
|
|
|
|||
Loading…
Reference in a new issue