Add type and unit in scrape loop; changed textparse Metric for it.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-02-12 14:57:40 +00:00
parent 2084ab6191
commit 193e2d27ec
11 changed files with 92 additions and 82 deletions

View file

@ -132,6 +132,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
ctx := context.Background()
app := w.Appender(ctx)
symbolTable := labels.NewSymbolTable() // One table per block means it won't grow too large.
builder := labels.NewScratchBuilderWithSymbolTable(symbolTable, 15)
p := textparse.NewOpenMetricsParser(input, symbolTable)
samplesCount := 0
for {
@ -148,8 +149,10 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
_, ts, v := p.Series()
if ts == nil {
l := labels.Labels{}
p.Metric(&l)
builder.Reset()
p.PopulateLabelsAndName(&builder)
builder.Sort()
l := builder.Labels()
return fmt.Errorf("expected timestamp for series %v, got none", l)
}
if *ts < t {
@ -162,8 +165,10 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
continue
}
l := labels.Labels{}
p.Metric(&l)
builder.Reset()
p.PopulateLabelsAndName(&builder)
builder.Sort()
l := builder.Labels()
lb.Reset(l)
for name, value := range customLabels {

View file

@ -162,16 +162,14 @@ func benchParse(b *testing.B, data []byte, parser string) {
b.Fatal("unknown parser", parser)
}
var (
res labels.Labels
e exemplar.Exemplar
)
var e exemplar.Exemplar
b.SetBytes(int64(len(data)))
b.ReportAllocs()
b.ResetTimer()
st := labels.NewSymbolTable()
lsetBuilder := labels.NewScratchBuilderWithSymbolTable(st, 15)
for i := 0; i < b.N; i++ {
p := newParserFn(data, st)
@ -202,8 +200,10 @@ func benchParse(b *testing.B, data []byte, parser string) {
default:
b.Fatal("not implemented entry", t)
}
_ = p.Metric(&res)
lsetBuilder.Reset()
p.PopulateLabelsAndName(&lsetBuilder)
lsetBuilder.Sort()
_ = lsetBuilder.Labels()
_ = p.CreatedTimestamp()
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
}

View file

@ -57,11 +57,8 @@ type Parser interface {
// The returned byte slice becomes invalid after the next call to Next.
Comment() []byte
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
// The values of the "le" labels of classic histograms and "quantile" labels
// of summaries should follow the OpenMetrics formatting rules.
Metric(l *labels.Labels) string
// PopulateLabelsAndName uses provided builder to add labels and metric name.
PopulateLabelsAndName(l *labels.ScratchBuilder)
// Exemplar writes the exemplar of the current sample into the passed
// exemplar. It can be called repeatedly to retrieve multiple exemplars

View file

@ -217,6 +217,7 @@ func requireEntries(t *testing.T, exp, got []parsedEntry) {
func testParse(t *testing.T, p Parser) (ret []parsedEntry) {
t.Helper()
lsetBuilder := labels.NewScratchBuilder(0)
for {
et, err := p.Next()
if errors.Is(err, io.EOF) {
@ -238,7 +239,11 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) {
got.m = string(m)
}
p.Metric(&got.lset)
lsetBuilder.Reset()
p.PopulateLabelsAndName(&lsetBuilder)
lsetBuilder.Sort()
got.lset = lsetBuilder.Labels()
// Parser reuses int pointer.
if ct := p.CreatedTimestamp(); ct != nil {
got.ct = int64p(*ct)

View file

@ -67,8 +67,7 @@ type NHCBParser struct {
h *histogram.Histogram
fh *histogram.FloatHistogram
// For Metric.
lset labels.Labels
metricString string
lset labels.Labels
// For Type.
bName []byte
typ model.MetricType
@ -141,13 +140,17 @@ func (p *NHCBParser) Comment() []byte {
return p.parser.Comment()
}
func (p *NHCBParser) Metric(l *labels.Labels) string {
func (p *NHCBParser) PopulateLabelsAndName(l *labels.ScratchBuilder) {
// TODO(bwplotka): Optimize this.
if p.state == stateEmitting {
*l = p.lsetNHCB
return p.metricStringNHCB
p.lsetNHCB.Range(func(label labels.Label) {
l.Add(label.Name, label.Value)
})
return
}
*l = p.lset
return p.metricString
p.lset.Range(func(label labels.Label) {
l.Add(label.Name, label.Value)
})
}
func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
@ -200,7 +203,10 @@ func (p *NHCBParser) Next() (Entry, error) {
switch p.entry {
case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series()
p.metricString = p.parser.Metric(&p.lset)
p.builder.Reset()
p.parser.PopulateLabelsAndName(&p.builder)
p.lset = p.builder.Labels()
// Check the label set to see if we can continue or need to emit the NHCB.
var isNHCB bool
if p.compareLabels() {
@ -224,7 +230,10 @@ func (p *NHCBParser) Next() (Entry, error) {
return p.entry, p.err
case EntryHistogram:
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.metricString = p.parser.Metric(&p.lset)
p.builder.Reset()
p.parser.PopulateLabelsAndName(&p.builder)
p.lset = p.builder.Labels()
p.storeExponentialLabels()
case EntryType:
p.bName, p.typ = p.parser.Type()

View file

@ -197,15 +197,11 @@ func (p *OpenMetricsParser) Comment() []byte {
return p.text
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
// Copy the buffer to a string: this is only necessary for the return value.
s := string(p.series)
func (p *OpenMetricsParser) PopulateLabelsAndName(l *labels.ScratchBuilder) {
s := yoloString(p.series)
p.builder.Reset()
metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start])
p.builder.Add(labels.MetricName, metricName)
l.Add(labels.MetricName, metricName)
for i := 2; i < len(p.offsets); i += 4 {
a := p.offsets[i] - p.start
@ -215,13 +211,8 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
d := p.offsets[i+3] - p.start
value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d]))
p.builder.Add(label, value)
l.Add(label, value)
}
p.builder.Sort()
*l = p.builder.Labels()
return s
}
// Exemplar writes the exemplar of the current sample into the passed exemplar.

View file

@ -147,15 +147,14 @@ func (l *promlexer) Error(es string) {
// PromParser parses samples from a byte slice of samples in the official
// Prometheus text exposition format.
type PromParser struct {
l *promlexer
builder labels.ScratchBuilder
series []byte
text []byte
mtype model.MetricType
val float64
ts int64
hasTS bool
start int
l *promlexer
series []byte
text []byte
mtype model.MetricType
val float64
ts int64
hasTS bool
start int
// offsets is a list of offsets into series that describe the positions
// of the metric name and label names and values for this series.
// p.offsets[0] is the start character of the metric name.
@ -166,10 +165,9 @@ type PromParser struct {
}
// NewPromParser returns a new parser of the byte slice.
func NewPromParser(b []byte, st *labels.SymbolTable) Parser {
func NewPromParser(b []byte, _ *labels.SymbolTable) Parser {
return &PromParser{
l: &promlexer{b: append(b, '\n')},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
l: &promlexer{b: append(b, '\n')},
}
}
@ -223,15 +221,11 @@ func (p *PromParser) Comment() []byte {
return p.text
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *PromParser) Metric(l *labels.Labels) string {
// Copy the buffer to a string: this is only necessary for the return value.
s := string(p.series)
func (p *PromParser) PopulateLabelsAndName(l *labels.ScratchBuilder) {
s := yoloString(p.series)
p.builder.Reset()
metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start])
p.builder.Add(labels.MetricName, metricName)
l.Add(labels.MetricName, metricName)
for i := 2; i < len(p.offsets); i += 4 {
a := p.offsets[i] - p.start
@ -241,13 +235,8 @@ func (p *PromParser) Metric(l *labels.Labels) string {
d := p.offsets[i+3] - p.start
value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d]))
p.builder.Add(label, value)
l.Add(label, value)
}
p.builder.Sort()
*l = p.builder.Labels()
return s
}
// Exemplar implements the Parser interface. However, since the classic

View file

@ -296,24 +296,14 @@ func (p *ProtobufParser) Comment() []byte {
return nil
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *ProtobufParser) Metric(l *labels.Labels) string {
p.builder.Reset()
p.builder.Add(labels.MetricName, p.getMagicName())
func (p *ProtobufParser) PopulateLabelsAndName(l *labels.ScratchBuilder) {
l.Add(labels.MetricName, p.getMagicName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
p.builder.Add(lp.GetName(), lp.GetValue())
l.Add(lp.GetName(), lp.GetValue())
}
if needed, name, value := p.getMagicLabel(); needed {
p.builder.Add(name, value)
l.Add(name, value)
}
// Sort labels to maintain the sorted labels invariant.
p.builder.Sort()
*l = p.builder.Labels()
return p.metricBytes.String()
}
// Exemplar writes the exemplar of the current sample into the passed

View file

@ -1629,6 +1629,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
builder = labels.NewScratchBuilderWithSymbolTable(sl.symbolTable, 15)
lset labels.Labels // escapes to heap so hoisted out of loop
e exemplar.Exemplar // escapes to heap so hoisted out of loop
lastMeta *metaEntry
@ -1714,7 +1715,23 @@ loop:
lset = ce.lset
hash = ce.hash
} else {
p.Metric(&lset)
// NOTE(bwplotka): Naive injection of type and unit as labels, without a
// feature flag. This is to measure initial overhead of this approach.
// Long term we might want to consider similar semantics of type and unit
// being part of strict series, however stored outside of labels e.g. in
// the new model/series.Series struct that wraps labels.
builder.Reset()
if lastMeta != nil {
// Is it new series OR did metadata change for this family?
if lastMeta.lastIterChange == sl.cache.iter {
builder.Add(labels.MetricType, string(lastMeta.Type))
builder.Add(labels.MetricUnit, lastMeta.Unit)
}
}
p.PopulateLabelsAndName(&builder)
builder.Sort()
lset = builder.Labels()
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels

View file

@ -1986,9 +1986,12 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
require.NotNil(t, p)
require.NoError(t, warning)
var lset labels.Labels
builder := labels.NewScratchBuilder(15)
p.Next()
p.Metric(&lset)
builder.Reset()
p.PopulateLabelsAndName(&builder)
builder.Sort()
lset := builder.Labels()
hash := lset.Hash()
// Create a fake entry in the cache

View file

@ -395,15 +395,19 @@ func TestFederationWithNativeHistograms(t *testing.T) {
p := textparse.NewProtobufParser(body, false, labels.NewSymbolTable())
var actVec promql.Vector
metricFamilies := 0
l := labels.Labels{}
lb := labels.NewScratchBuilder(15)
for {
et, err := p.Next()
if err != nil && errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
var l labels.Labels
if et == textparse.EntryHistogram || et == textparse.EntrySeries {
p.Metric(&l)
lb.Reset()
p.PopulateLabelsAndName(&lb)
lb.Sort()
l = lb.Labels()
}
switch et {
case textparse.EntryHelp: