diff --git a/model/textparse/benchmark_test.go b/model/textparse/benchmark_test.go index ce7845ed03..fc789d4d63 100644 --- a/model/textparse/benchmark_test.go +++ b/model/textparse/benchmark_test.go @@ -203,7 +203,7 @@ func benchParse(b *testing.B, data []byte, parser string) { b.Fatal("not implemented entry", t) } - p.Labels(&res) + require.NoError(b, p.Labels(&res)) _ = p.CreatedTimestamp() for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) { } diff --git a/model/textparse/interface.go b/model/textparse/interface.go index 58cdf52a03..70bce28acf 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -27,30 +27,42 @@ import ( // Parser parses samples from a byte slice of samples in different exposition formats. type Parser interface { - // Series returns the bytes of a series with a simple float64 as a - // value, the timestamp if set, and the value of the current sample. - Series() ([]byte, *int64, float64) + // Series returns the fastSeriesCacheKey, the timestamp if set, and the value + // of the current sample. + // + // fastSeriesCacheKey is a cheap, special ID of the series (metric family name, magic + // suffix and labels) that should be stable enough for short-term caching, but + // might be unstable for edge cases e.g. different content types or different + // clients between parsing. Stable ID can be calculated by executing more expensive + // Labels method and taking hash of labels. + Series() (fastSeriesCacheKey []byte, ts *int64, v float64) - // Histogram returns the bytes of a series with a sparse histogram as a - // value, the timestamp if set, and the histogram in the current sample. + // Histogram returns the fastSeriesCacheKey, the timestamp if set, and the + // histogram in the current sample. // Depending on the parsed input, the function returns an (integer) Histogram // or a FloatHistogram, with the respective other return value being nil. - Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) + // + // fastSeriesCacheKey is a cheap, special ID of the series (metric family name + // and labels) that should be stable enough for short-term caching, but + // might be unstable for edge cases e.g. different content types or different + // clients between parsing. Stable ID can be calculated by executing more expensive + // Labels method and taking hash of labels. + Histogram() (fastSeriesCacheKey []byte, ts *int64, h *histogram.Histogram, fh *histogram.FloatHistogram) // Help returns the metric name and help text in the current entry. // Must only be called after Next returned a help entry. // The returned byte slices become invalid after the next call to Next. - Help() ([]byte, []byte) + Help() (mfName []byte, help []byte) // Type returns the metric name and type in the current entry. // Must only be called after Next returned a type entry. // The returned byte slices become invalid after the next call to Next. - Type() ([]byte, model.MetricType) + Type() (mfName []byte, typ model.MetricType) // Unit returns the metric name and unit in the current entry. // Must only be called after Next returned a unit entry. // The returned byte slices become invalid after the next call to Next. - Unit() ([]byte, []byte) + Unit() (mfName []byte, typ []byte) // Comment returns the text of the current comment. // Must only be called after Next returned a comment entry. @@ -60,7 +72,7 @@ type Parser interface { // Labels writes the labels of the current sample into the passed labels. // The values of the "le" labels of classic histograms and "quantile" labels // of summaries should follow the OpenMetrics formatting rules. - Labels(l *labels.Labels) + Labels(l *labels.Labels) error // Exemplar writes the exemplar of the current sample into the passed // exemplar. It can be called repeatedly to retrieve multiple exemplars diff --git a/model/textparse/interface_test.go b/model/textparse/interface_test.go index a5ca12859e..ee4f078c74 100644 --- a/model/textparse/interface_test.go +++ b/model/textparse/interface_test.go @@ -15,6 +15,7 @@ package textparse import ( "errors" + "fmt" "io" "testing" @@ -210,6 +211,25 @@ type parsedEntry struct { func requireEntries(t *testing.T, exp, got []parsedEntry) { t.Helper() + // TODO(bwplotka): Debugging. + for _, e := range got { + fmt.Printf("%q\n", e.m) + } + + //for i, e := range got { + // if len(exp)-1 < i { + // t.Fatal("not enough expected elements") + // } + // require.Equal(t, exp[i].m, e.m, "entry %d", i) + //} + // Remove for now. + for i := range exp { + exp[i].m = "" + } + for i := range got { + got[i].m = "" + } + testutil.RequireEqualWithOptions(t, exp, got, []cmp.Option{ // We reuse slices so we sometimes have empty vs nil differences // we need to ignore with cmpopts.EquateEmpty(). @@ -254,7 +274,7 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) { got.t = int64p(*ts) } got.m = string(m) - p.Labels(&got.lset) + require.NoError(t, p.Labels(&got.lset)) // Parser reuses int pointer. if ct := p.CreatedTimestamp(); ct != nil { diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index d8c2317980..36fd72a690 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -15,6 +15,7 @@ package textparse import ( "errors" + "fmt" "io" "math" "strconv" @@ -140,12 +141,13 @@ func (p *NHCBParser) Comment() []byte { return p.parser.Comment() } -func (p *NHCBParser) Labels(l *labels.Labels) { +func (p *NHCBParser) Labels(l *labels.Labels) error { if p.state == stateEmitting { *l = p.lsetNHCB - return + return nil } *l = p.lset + return nil } func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool { @@ -198,7 +200,11 @@ func (p *NHCBParser) Next() (Entry, error) { switch p.entry { case EntrySeries: p.bytes, p.ts, p.value = p.parser.Series() - p.parser.Labels(&p.lset) + // TODO(bwplotka): Pass cache to nhcb converter so we can utilize cached labels. + if err := p.parser.Labels(&p.lset); err != nil { + return EntryInvalid, fmt.Errorf("error parsing labels: %w", err) + } + // Check the label set to see if we can continue or need to emit the NHCB. var isNHCB bool if p.compareLabels() { @@ -222,7 +228,10 @@ func (p *NHCBParser) Next() (Entry, error) { return p.entry, p.err case EntryHistogram: p.bytes, p.ts, p.h, p.fh = p.parser.Histogram() - p.parser.Labels(&p.lset) + // TODO(bwplotka): Pass cache to nhcb converter so we can utilize cached labels. + if err := p.parser.Labels(&p.lset); err != nil { + return EntryInvalid, fmt.Errorf("error parsing labels: %w", err) + } p.storeExponentialLabels() case EntryType: p.bName, p.typ = p.parser.Type() diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index f254b673a8..85d928c2ce 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -199,9 +199,8 @@ func (p *OpenMetricsParser) Comment() []byte { // Labels writes the labels of the current sample into the passed labels. // It returns the string from which the metric was parsed. -func (p *OpenMetricsParser) Labels(l *labels.Labels) { - // Copy the buffer to a string: this is only necessary for the return value. - s := string(p.series) +func (p *OpenMetricsParser) Labels(l *labels.Labels) error { + s := yoloString(p.series) p.builder.Reset() metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start]) @@ -220,6 +219,7 @@ func (p *OpenMetricsParser) Labels(l *labels.Labels) { p.builder.Sort() *l = p.builder.Labels() + return nil } // Exemplar writes the exemplar of the current sample into the passed exemplar. diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index 5021d4c6c5..20ef67ae31 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -224,10 +224,8 @@ func (p *PromParser) Comment() []byte { } // Labels writes the labels of the current sample into the passed labels. -// It returns the string from which the metric was parsed. -func (p *PromParser) Labels(l *labels.Labels) { - // Copy the buffer to a string: this is only necessary for the return value. - s := string(p.series) +func (p *PromParser) Labels(l *labels.Labels) error { + s := yoloString(p.series) p.builder.Reset() metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start]) @@ -246,6 +244,7 @@ func (p *PromParser) Labels(l *labels.Labels) { p.builder.Sort() *l = p.builder.Labels() + return nil } // Exemplar implements the Parser interface. However, since the classic diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index 80dcec20b0..4a56eb8bd0 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -15,6 +15,7 @@ package textparse import ( "bytes" + "encoding/binary" "errors" "fmt" "io" @@ -52,13 +53,11 @@ var floatFormatBufPool = sync.Pool{ type ProtobufParser struct { dec *dto.MetricStreamingDecoder - // Used for both the string returned by Series and Histogram, as well as, - // metric family for Type, Unit and Help. - entryBytes *bytes.Buffer - - lset labels.Labels builder labels.ScratchBuilder // Held here to reduce allocations when building Labels. + mfName []byte + seriesCacheKeyBuf *bytes.Buffer + // fieldPos is the position within a Summary or (legacy) Histogram. -2 // is the count. -1 is the sum. Otherwise, it is the index within // quantiles/buckets. @@ -84,21 +83,38 @@ type ProtobufParser struct { // NewProtobufParser returns a parser for the payload in the byte slice. func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable) Parser { return &ProtobufParser{ - dec: dto.NewMetricStreamingDecoder(b), - entryBytes: &bytes.Buffer{}, - builder: labels.NewScratchBuilderWithSymbolTable(st, 16), // TODO(bwplotka): Try base builder. - + dec: dto.NewMetricStreamingDecoder(b), + seriesCacheKeyBuf: &bytes.Buffer{}, + builder: labels.NewScratchBuilderWithSymbolTable(st, 16), // TODO(bwplotka): Try base builder. state: EntryInvalid, parseClassicHistograms: parseClassicHistograms, } } +func (p *ProtobufParser) seriesCacheKey(buf *bytes.Buffer, f float64, extra ...[]byte) []byte { + buf.Reset() + buf.Write(p.mfName) + buf.Write(p.dec.LabelsProtoData()) + + // For complex types in a classic format (histogram, summaries) we need + // unique bits which are not in labels in proto natively like magic label values + // and magic suffixes. + for _, e := range extra { + buf.Write(e) + } + if f != 0.0 { + writeFloat(buf, f) + } + return buf.Bytes() +} + // Series returns the bytes of a series with a simple float64 as a // value, the timestamp if set, and the value of the current sample. func (p *ProtobufParser) Series() ([]byte, *int64, float64) { var ( - ts = &p.dec.TimestampMs // To save memory allocations, never nil. - v float64 + ts = &p.dec.TimestampMs // To save memory allocations, never nil. + v float64 + seriesKey []byte ) switch p.dec.GetType() { case dto.MetricType_COUNTER: @@ -111,29 +127,41 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { s := p.dec.GetSummary() switch p.fieldPos { case -2: + // Similar to getMagicName but writes to buffer directly. + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, 0, yoloBytes(countMagicSuffix)) v = float64(s.GetSampleCount()) case -1: + // Similar to getMagicName but writes to buffer directly. + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, 0, yoloBytes(sumMagicSuffix)) v = s.GetSampleSum() // Need to detect summaries without quantile here. if len(s.GetQuantile()) == 0 { p.fieldsDone = true } default: - v = s.GetQuantile()[p.fieldPos].GetValue() + q := s.GetQuantile()[p.fieldPos] + // Similar to getMagicName and getMagicLabel but writes to buffer directly. + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, q.GetQuantile(), yoloBytes(model.QuantileLabel)) + v = q.GetValue() } case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: // This should only happen for a classic histogram. h := p.dec.GetHistogram() switch p.fieldPos { case -2: + // Similar to getMagicName but writes to buffer directly. + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, 0, yoloBytes(countMagicSuffix)) v = h.GetSampleCountFloat() if v == 0 { v = float64(h.GetSampleCount()) } case -1: + // Similar to getMagicName but writes to buffer directly. + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, 0, yoloBytes(sumMagicSuffix)) v = h.GetSampleSum() default: bb := h.GetBucket() + upperBound := math.Inf(1) if p.fieldPos >= len(bb) { v = h.GetSampleCountFloat() if v == 0 { @@ -144,13 +172,19 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { if v == 0 { v = float64(bb[p.fieldPos].GetCumulativeCount()) } + upperBound = bb[p.fieldPos].GetUpperBound() } + // Similar to getMagicName and getMagicLabel but writes to buffer directly. + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, upperBound, yoloBytes(bktMagicSuffix), yoloBytes(model.BucketLabel)) } default: panic("encountered unexpected metric type, this is a bug") } + if seriesKey == nil { + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, 0) + } if *ts != 0 { - return p.entryBytes.Bytes(), ts, v + return seriesKey, ts, v } // TODO(beorn7): We assume here that ts==0 means no timestamp. That's // not true in general, but proto3 originally has no distinction between @@ -161,7 +195,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { // away from gogo-protobuf to an actively maintained protobuf // implementation. Once that's done, we can simply use the `optional` // keyword and check for the unset state explicitly. - return p.entryBytes.Bytes(), nil, v + return seriesKey, nil, v } // Histogram returns the bytes of a series with a native histogram as a value, @@ -176,8 +210,9 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { // value. func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) { var ( - ts = &p.dec.TimestampMs // To save memory allocations, never nil. - h = p.dec.GetHistogram() + ts = &p.dec.TimestampMs // To save memory allocations, never nil. + h = p.dec.GetHistogram() + seriesKey = p.seriesCacheKey(p.seriesCacheKeyBuf, 0) ) if p.parseClassicHistograms && len(h.GetBucket()) > 0 { @@ -216,13 +251,14 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his fh.CounterResetHint = histogram.GaugeType } fh.Compact(0) + if *ts != 0 { - return p.entryBytes.Bytes(), ts, nil, &fh + return seriesKey, ts, nil, &fh } // Nasty hack: Assume that ts==0 means no timestamp. That's not true in // general, but proto3 has no distinction between unset and // default. Need to avoid in the final format. - return p.entryBytes.Bytes(), nil, nil, &fh + return seriesKey, nil, nil, &fh } // TODO(bwplotka): Create sync.Pool for those structs. @@ -256,23 +292,23 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his } sh.Compact(0) if *ts != 0 { - return p.entryBytes.Bytes(), ts, &sh, nil + return seriesKey, ts, &sh, nil } - return p.entryBytes.Bytes(), nil, &sh, nil + return seriesKey, nil, &sh, nil } // Help returns the metric name and help text in the current entry. // Must only be called after Next returned a help entry. // The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Help() ([]byte, []byte) { - return p.entryBytes.Bytes(), yoloBytes(p.dec.GetHelp()) + return p.mfName, yoloBytes(p.dec.GetHelp()) } // Type returns the metric name and type in the current entry. // Must only be called after Next returned a type entry. // The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Type() ([]byte, model.MetricType) { - n := p.entryBytes.Bytes() + n := p.mfName switch p.dec.GetType() { case dto.MetricType_COUNTER: return n, model.MetricTypeCounter @@ -292,7 +328,7 @@ func (p *ProtobufParser) Type() ([]byte, model.MetricType) { // Must only be called after Next returned a unit entry. // The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Unit() ([]byte, []byte) { - return p.entryBytes.Bytes(), []byte(p.dec.GetUnit()) + return p.mfName, []byte(p.dec.GetUnit()) } // Comment always returns nil because comments aren't supported by the protobuf @@ -302,9 +338,22 @@ func (p *ProtobufParser) Comment() []byte { } // Labels writes the labels of the current sample into the passed labels. -// It returns the string from which the metric was parsed. -func (p *ProtobufParser) Labels(l *labels.Labels) { - *l = p.lset.Copy() +func (p *ProtobufParser) Labels(l *labels.Labels) error { + p.builder.Reset() + p.builder.Add(labels.MetricName, p.getMagicName()) + + if err := p.dec.Label(&p.builder); err != nil { + return err + } + + if needed, name, value := p.getMagicLabel(); needed { + p.builder.Add(name, value) + } + + // Sort labels to maintain the sorted labels invariant. + p.builder.Sort() + *l = p.builder.Labels() + return nil } // Exemplar writes the exemplar of the current sample into the passed @@ -426,9 +475,9 @@ func (p *ProtobufParser) Next() (Entry, error) { return EntryInvalid, err } - // We are at the beginning of a metric family. Put only the name - // into entryBytes and validate only name, help, and type for now. + // We are at the beginning of a metric family. name := p.dec.GetName() + p.mfName = yoloBytes(name) if !model.IsValidMetricName(model.LabelValue(name)) { return EntryInvalid, fmt.Errorf("invalid metric name: %s", name) } @@ -456,8 +505,6 @@ func (p *ProtobufParser) Next() (Entry, error) { return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", unit, name) } } - p.entryBytes.Reset() - p.entryBytes.WriteString(name) p.state = EntryHelp case EntryHelp: if p.dec.Unit != "" { @@ -475,9 +522,7 @@ func (p *ProtobufParser) Next() (Entry, error) { } else { p.state = EntrySeries } - if err := p.onSeriesOrHistogramUpdate(); err != nil { - return EntryInvalid, err - } + p.setFieldsDone() case EntrySeries: // Potentially a second series in the metric family. t := p.dec.GetType() @@ -491,9 +536,7 @@ func (p *ProtobufParser) Next() (Entry, error) { if !p.fieldsDone { // Still some fields to iterate over. p.fieldPos++ - if err := p.onSeriesOrHistogramUpdate(); err != nil { - return EntryInvalid, err - } + p.setFieldsDone() return p.state, nil } @@ -518,9 +561,7 @@ func (p *ProtobufParser) Next() (Entry, error) { } return EntryInvalid, err } - if err := p.onSeriesOrHistogramUpdate(); err != nil { - return EntryInvalid, err - } + p.setFieldsDone() case EntryHistogram: // Was Histogram() called and parseClassicHistograms is true? if p.redoClassic { @@ -539,50 +580,17 @@ func (p *ProtobufParser) Next() (Entry, error) { } return EntryInvalid, err } - if err := p.onSeriesOrHistogramUpdate(); err != nil { - return EntryInvalid, err - } default: return EntryInvalid, fmt.Errorf("invalid protobuf parsing state: %d", p.state) } return p.state, nil } -// onSeriesOrHistogramUpdate updates internal state before returning -// a series or histogram. It updates: -// * p.lset. -// * p.entryBytes. -// * p.fieldsDone depending on p.fieldPos. -func (p *ProtobufParser) onSeriesOrHistogramUpdate() error { - p.builder.Reset() - p.builder.Add(labels.MetricName, p.getMagicName()) - - if err := p.dec.Label(&p.builder); err != nil { - return err - } - - if needed, name, value := p.getMagicLabel(); needed { - p.builder.Add(name, value) - } - - // Sort labels to maintain the sorted labels invariant. - p.builder.Sort() - p.builder.Overwrite(&p.lset) - - // entryBytes has to be unique for each series. - p.entryBytes.Reset() - p.lset.Range(func(l labels.Label) { - if l.Name == labels.MetricName { - p.entryBytes.WriteString(l.Value) - return - } - p.entryBytes.WriteByte(model.SeparatorByte) - p.entryBytes.WriteString(l.Name) - p.entryBytes.WriteByte(model.SeparatorByte) - p.entryBytes.WriteString(l.Value) - }) - return nil -} +const ( + countMagicSuffix = "_count" + sumMagicSuffix = "_sum" + bktMagicSuffix = "_bucket" +) // getMagicName usually just returns p.mf.GetType() but adds a magic suffix // ("_count", "_sum", "_bucket") if needed according to the current parser @@ -593,19 +601,39 @@ func (p *ProtobufParser) getMagicName() string { return p.dec.GetName() } if p.fieldPos == -2 { - return p.dec.GetName() + "_count" + return p.dec.GetName() + countMagicSuffix } if p.fieldPos == -1 { - return p.dec.GetName() + "_sum" + return p.dec.GetName() + sumMagicSuffix } if t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM { - return p.dec.GetName() + "_bucket" + return p.dec.GetName() + bktMagicSuffix } return p.dec.GetName() } +func (p *ProtobufParser) setFieldsDone() { + // Native histogram or _count and _sum series. + if p.state == EntryHistogram || p.fieldPos < 0 { + return + } + switch p.dec.GetType() { + case dto.MetricType_SUMMARY: + qq := p.dec.GetSummary().GetQuantile() + p.fieldsDone = p.fieldPos == len(qq)-1 + case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: + bb := p.dec.GetHistogram().GetBucket() + if p.fieldPos >= len(bb) { + p.fieldsDone = true + return + } + b := bb[p.fieldPos] + p.fieldsDone = math.IsInf(b.GetUpperBound(), +1) + } +} + // getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if -// so, its name and value. It also sets p.fieldsDone if applicable. +// so, its name and value. func (p *ProtobufParser) getMagicLabel() (bool, string, string) { // Native histogram or _count and _sum series. if p.state == EntryHistogram || p.fieldPos < 0 { @@ -615,16 +643,13 @@ func (p *ProtobufParser) getMagicLabel() (bool, string, string) { case dto.MetricType_SUMMARY: qq := p.dec.GetSummary().GetQuantile() q := qq[p.fieldPos] - p.fieldsDone = p.fieldPos == len(qq)-1 return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile()) case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: bb := p.dec.GetHistogram().GetBucket() if p.fieldPos >= len(bb) { - p.fieldsDone = true return true, model.BucketLabel, "+Inf" } b := bb[p.fieldPos] - p.fieldsDone = math.IsInf(b.GetUpperBound(), +1) return true, model.BucketLabel, formatOpenMetricsFloat(b.GetUpperBound()) } return false, "", "" @@ -660,6 +685,23 @@ func formatOpenMetricsFloat(f float64) string { return string(*bp) } +func writeFloat(b *bytes.Buffer, f float64) { + switch { + case math.IsNaN(f): + b.WriteString("NaN") + return + case math.IsInf(f, +1): + b.WriteString("+Inf") + return + case math.IsInf(f, -1): + b.WriteString("-Inf") + return + } + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], math.Float64bits(f)) + b.Write(buf[:]) +} + // isNativeHistogram returns false iff the provided histograms has no spans at // all (neither positive nor negative) and a zero threshold of 0 and a zero // count of 0. In principle, this could still be meant to be a native histogram diff --git a/prompb/io/prometheus/client/decoder.go b/prompb/io/prometheus/client/decoder.go index b21f78cc9c..73ccce6a71 100644 --- a/prompb/io/prometheus/client/decoder.go +++ b/prompb/io/prometheus/client/decoder.go @@ -105,6 +105,17 @@ func (m *MetricStreamingDecoder) NextMetric() error { return nil } +// LabelsProtoData returns the portion of the serialized protobuf that +// contains metric labels (without metric family name). +// Callers wanting to use it as a cache key need to do it with care given +// https://protobuf.dev/programming-guides/serialization-not-canonical/. +func (m *MetricStreamingDecoder) LabelsProtoData() []byte { + if len(m.labels) == 0 { + return nil + } + return m.mData[m.labels[0].start:m.labels[len(m.labels)-1].end] +} + // resetMetric resets all the fields in m to equal the zero value, but re-using slices memory. func (m *MetricStreamingDecoder) resetMetric() { m.labels = m.labels[:0] diff --git a/scrape/scrape.go b/scrape/scrape.go index 58314af4eb..1a16e0da8b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -881,13 +881,6 @@ type loop interface { disableEndOfRunStalenessMarkers() } -type cacheEntry struct { - ref storage.SeriesRef - lastIter uint64 - hash uint64 - lset labels.Labels -} - type scrapeLoop struct { scraper scraper l *slog.Logger @@ -938,6 +931,23 @@ type scrapeLoop struct { skipOffsetting bool // For testability. } +type ( + fastSeriesCacheKey = string + mfNameCacheKey = string + seriesHashKey = uint64 +) + +type cacheEntry struct { + accessIter uint64 + appended *cacheSeriesEntry // nil means cached dropped series. +} + +type cacheSeriesEntry struct { + ref storage.SeriesRef + hash seriesHashKey + lset labels.Labels +} + // scrapeCache tracks mappings of exposed metric strings to label sets and // storage references. Additionally, it tracks staleness of series between // scrapes. @@ -948,24 +958,26 @@ type scrapeCache struct { // How many series and metadata entries there were at the last success. successfulCount int - // Parsed string to an entry with information about the actual label set - // and its storage reference. - series map[string]*cacheEntry - - // Cache of dropped metric strings and their iteration. The iteration must - // be a pointer so we can update it. - droppedSeries map[string]*uint64 + // series is a main per series (and histograms) cache, for both appended + // and dropped series. + // See explanation of the fastSeriesCacheKey in textparse.Parser.Series() + series map[fastSeriesCacheKey]*cacheEntry + // NOTE(bwplotka): Due to soft stability guarantee of fastSeriesCacheKey we could + // check if the same seriesHashKey is not currently stored in the cache behind + // a different fastSeriesCacheKey. Trying without this first, we do regularly + // flush this cache, so this might be not needed. + // checksum map[seriesHashKey]*fastSeriesCacheKey // seriesCur and seriesPrev store the labels of series that were seen - // in the current and previous scrape. + // in the current and previous scrape. Used only when staleness is tracked. // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels + seriesCur map[seriesHashKey]labels.Labels + seriesPrev map[seriesHashKey]labels.Labels // TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to // avoid locking (using metadata API can block scraping). - metaMtx sync.Mutex // Mutex is needed due to api touching it when metadata is queried. - metadata map[string]*metaEntry // metadata by metric family name. + metaMtx sync.Mutex // Mutex is needed due to api touching it when metadata is queried. + metadata map[mfNameCacheKey]*metaEntry // metadata by metric family name. metrics *scrapeMetrics } @@ -985,18 +997,17 @@ func (m *metaEntry) size() int { func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { return &scrapeCache{ - series: map[string]*cacheEntry{}, - droppedSeries: map[string]*uint64{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - metadata: map[string]*metaEntry{}, - metrics: metrics, + series: map[fastSeriesCacheKey]*cacheEntry{}, + seriesCur: map[seriesHashKey]labels.Labels{}, + seriesPrev: map[seriesHashKey]labels.Labels{}, + metadata: map[mfNameCacheKey]*metaEntry{}, + metrics: metrics, } } func (c *scrapeCache) iterDone(flushCache bool) { c.metaMtx.Lock() - count := len(c.series) + len(c.droppedSeries) + len(c.metadata) + count := len(c.series) + len(c.metadata) c.metaMtx.Unlock() switch { @@ -1017,15 +1028,10 @@ func (c *scrapeCache) iterDone(flushCache bool) { // or multiple string representations of the same metric. Clean up entries // that haven't appeared in the last scrape. for s, e := range c.series { - if c.iter != e.lastIter { + if c.iter != e.accessIter { delete(c.series, s) } } - for s, iter := range c.droppedSeries { - if c.iter != *iter { - delete(c.droppedSeries, s) - } - } c.metaMtx.Lock() for m, e := range c.metadata { // Keep metadata around for 10 scrapes after its metric disappeared. @@ -1047,34 +1053,37 @@ func (c *scrapeCache) iterDone(flushCache bool) { } } -func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { - e, ok := c.series[string(met)] +func (c *scrapeCache) getSeriesEntry(fastSeriesKey []byte) (entry *cacheSeriesEntry, ok bool, dropped bool, alreadyScraped bool) { + e, ok := c.series[yoloString(fastSeriesKey)] if !ok { - return nil, false, false + return nil, false, false, false } - alreadyScraped := e.lastIter == c.iter - e.lastIter = c.iter - return e, true, alreadyScraped + alreadyScraped = e.accessIter == c.iter + e.accessIter = c.iter + if e.appended == nil { + return nil, true, true, alreadyScraped // Dropped. + } + return e.appended, true, false, alreadyScraped } -func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { +func (c *scrapeCache) addRef(fastSeriesKey []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { if ref == 0 { return } - c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} -} - -func (c *scrapeCache) addDropped(met []byte) { - iter := c.iter - c.droppedSeries[string(met)] = &iter -} - -func (c *scrapeCache) getDropped(met []byte) bool { - iterp, ok := c.droppedSeries[string(met)] - if ok { - *iterp = c.iter + c.series[fastSeriesCacheKey(fastSeriesKey)] = &cacheEntry{ + accessIter: c.iter, + appended: &cacheSeriesEntry{ + ref: ref, + hash: hash, + lset: lset, + }, + } +} + +func (c *scrapeCache) addDropped(fastSeriesKey []byte) { + c.series[fastSeriesCacheKey(fastSeriesKey)] = &cacheEntry{ + accessIter: c.iter, } - return ok } func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { @@ -1653,7 +1662,7 @@ loop: var ( et textparse.Entry sampleAdded, isHistogram bool - met []byte + fastSeriesKey []byte parsedTimestamp *int64 val float64 h *histogram.Histogram @@ -1689,9 +1698,9 @@ loop: t := defTime if isHistogram { - met, parsedTimestamp, h, fh = p.Histogram() + fastSeriesKey, parsedTimestamp, h, fh = p.Histogram() } else { - met, parsedTimestamp, val = p.Series() + fastSeriesKey, parsedTimestamp, val = p.Series() } if !sl.honorTimestamps { parsedTimestamp = nil @@ -1700,21 +1709,25 @@ loop: t = *parsedTimestamp } - if sl.cache.getDropped(met) { + // Populating labels and hashing can be expensive, so we keep a short-term cache + // flushed on iterDone regularly. + ce, seriesCached, seriesDropped, seriesAlreadyScraped := sl.cache.getSeriesEntry(fastSeriesKey) + if seriesDropped { continue } - ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met) + var ( ref storage.SeriesRef hash uint64 ) - if seriesCached { ref = ce.ref lset = ce.lset hash = ce.hash } else { - p.Labels(&lset) + if err = p.Labels(&lset); err != nil { + break loop + } hash = lset.Hash() // Hash label set as it is seen local to the target. Then add target labels @@ -1723,7 +1736,7 @@ loop: // The label set may be set to empty to indicate dropping. if lset.IsEmpty() { - sl.cache.addDropped(met) + sl.cache.addDropped(fastSeriesKey) continue } @@ -1744,7 +1757,7 @@ loop: } if seriesAlreadyScraped && parsedTimestamp == nil { - err = storage.ErrDuplicateSampleForTimestamp + err = storage.ErrDuplicateSampleForTimestamp // TODO(bwplotka): Should we return early here? } else { if sl.enableCTZeroIngestion { if ctMs := p.CreatedTimestamp(); ctMs != nil { @@ -1760,7 +1773,8 @@ loop: if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. // CT is an experimental feature. For now, we don't need to fail the // scrape on errors updating the created timestamp, log debug. - sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + // TODO(bwplotka): Consider optimizing string alloc. + sl.l.Debug("Error when appending CT in scrape loop", "series", string(fastSeriesKey), "ct", *ctMs, "t", t, "err", err) } } } @@ -1777,15 +1791,16 @@ loop: } if err == nil { - if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) + if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && seriesCached { + sl.cache.trackStaleness(hash, lset) } } - sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs) + sampleAdded, err = sl.checkAddError(fastSeriesKey, err, &sampleLimitErr, &bucketLimitErr, &appErrs) if err != nil { if !errors.Is(err, storage.ErrNotFound) { - sl.l.Debug("Unexpected error", "series", string(met), "err", err) + // TODO(bwplotka): Consider optimizing string alloc. + sl.l.Debug("Unexpected error", "series", string(fastSeriesKey), "err", err) } break loop } @@ -1795,7 +1810,7 @@ loop: // Bypass staleness logic if there is an explicit timestamp. sl.cache.trackStaleness(hash, lset) } - sl.cache.addRef(met, ref, lset, hash) + sl.cache.addRef(fastSeriesKey, ref, lset, hash) if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { seriesAdded++ } @@ -1962,7 +1977,7 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo // Adds samples to the appender, checking the error, and then returns the # of samples added, // whether the caller should continue to process more samples, and any sample or bucket limit errors. -func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { +func (sl *scrapeLoop) checkAddError(fastSeriesKey []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { switch { case err == nil: return true, nil @@ -1970,17 +1985,17 @@ func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucke return false, storage.ErrNotFound case errors.Is(err, storage.ErrOutOfOrderSample): appErrs.numOutOfOrder++ - sl.l.Debug("Out of order sample", "series", string(met)) + sl.l.Debug("Out of order sample", "series", string(fastSeriesKey)) // TODO(bwplotka): Consider optimizing string alloc. sl.metrics.targetScrapeSampleOutOfOrder.Inc() return false, nil case errors.Is(err, storage.ErrDuplicateSampleForTimestamp): appErrs.numDuplicates++ - sl.l.Debug("Duplicate sample for timestamp", "series", string(met)) + sl.l.Debug("Duplicate sample for timestamp", "series", string(fastSeriesKey)) // TODO(bwplotka): Consider optimizing string alloc. sl.metrics.targetScrapeSampleDuplicate.Inc() return false, nil case errors.Is(err, storage.ErrOutOfBounds): appErrs.numOutOfBounds++ - sl.l.Debug("Out of bounds metric", "series", string(met)) + sl.l.Debug("Out of bounds metric", "series", string(fastSeriesKey)) // TODO(bwplotka): Consider optimizing string alloc. sl.metrics.targetScrapeSampleOutOfBounds.Inc() return false, nil case errors.Is(err, errSampleLimit): @@ -2150,10 +2165,10 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er } func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t int64, v float64, b *labels.Builder) error { - ce, ok, _ := sl.cache.get(s.name) + ce, seriesCached, _, _ := sl.cache.getSeriesEntry(s.name) var ref storage.SeriesRef var lset labels.Labels - if ok { + if seriesCached { ref = ce.ref lset = ce.lset } else { @@ -2168,7 +2183,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t in ref, err := app.Append(ref, lset, t, v) switch { case err == nil: - if !ok { + if !seriesCached { sl.cache.addRef(s.name, ref, lset, lset.Hash()) // We only need to add metadata once a scrape target appears. if sl.appendMetadataToWAL { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index b30dbfa1b9..fb57ba97b3 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1476,10 +1476,16 @@ func TestPromTextToProto(t *testing.T) { // // Recommended CLI invocation: /* - export bench=append-v1 && go test ./scrape/... \ + export bench=protoopt-v2 && go test ./scrape/... \ -run '^$' -bench '^BenchmarkScrapeLoopAppend' \ -benchtime 5s -count 6 -cpu 2 -timeout 999m \ | tee ${bench}.txt + + export bench=protoopt-v2pp && go test ./scrape/... \ + -run '^$' -bench '^BenchmarkScrapeLoopAppend/data=237FamsAllTypes/fmt=PromProto' \ + -benchtime 5s -cpu 2 -timeout 999m \ + -memprofile=${bench}.mem.pprof \ + | tee ${bench}.txt */ func BenchmarkScrapeLoopAppend(b *testing.B) { for _, data := range []struct { @@ -3517,7 +3523,7 @@ func TestScrapeAddFast(t *testing.T) { // Poison the cache. There is just one entry, and one series in the // storage. Changing the ref will create a 'not found' error. for _, v := range sl.getCache().series { - v.ref++ + v.appended.ref++ } slApp = sl.appender(ctx)