diff --git a/config/config.go b/config/config.go index 9defa10d48..5615789789 100644 --- a/config/config.go +++ b/config/config.go @@ -617,6 +617,8 @@ type ScrapeConfig struct { ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"` // Whether to scrape a classic histogram that is also exposed as a native histogram. ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"` + // Whether to convert a scraped classic histogram into a native histogram with custom buckets. + ConvertClassicHistograms bool `yaml:"convert_classic_histograms,omitempty"` // The HTTP resource path on which to fetch metrics from targets. MetricsPath string `yaml:"metrics_path,omitempty"` // The URL scheme with which to fetch metrics from targets. diff --git a/promql/promqltest/test.go b/promql/promqltest/test.go index f3a773be8d..576b30d5b4 100644 --- a/promql/promqltest/test.go +++ b/promql/promqltest/test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/almost" + "github.com/prometheus/prometheus/util/convertnhcb" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" ) @@ -460,43 +461,22 @@ func (cmd *loadCmd) append(a storage.Appender) error { return nil } -func getHistogramMetricBase(m labels.Labels, suffix string) (labels.Labels, uint64) { - mName := m.Get(labels.MetricName) - baseM := labels.NewBuilder(m). - Set(labels.MetricName, strings.TrimSuffix(mName, suffix)). - Del(labels.BucketLabel). - Labels() - hash := baseM.Hash() - return baseM, hash -} - type tempHistogramWrapper struct { metric labels.Labels upperBounds []float64 - histogramByTs map[int64]tempHistogram + histogramByTs map[int64]convertnhcb.TempHistogram } func newTempHistogramWrapper() tempHistogramWrapper { return tempHistogramWrapper{ upperBounds: []float64{}, - histogramByTs: map[int64]tempHistogram{}, + histogramByTs: map[int64]convertnhcb.TempHistogram{}, } } -type tempHistogram struct { - bucketCounts map[float64]float64 - count float64 - sum float64 -} - -func newTempHistogram() tempHistogram { - return tempHistogram{ - bucketCounts: map[float64]float64{}, - } -} - -func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogramWrapper func(*tempHistogramWrapper), updateHistogram func(*tempHistogram, float64)) { - m2, m2hash := getHistogramMetricBase(m, suffix) +func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogramWrapper func(*tempHistogramWrapper), updateHistogram func(*convertnhcb.TempHistogram, float64)) { + m2 := convertnhcb.GetHistogramMetricBase(m, suffix) + m2hash := m2.Hash() histogramWrapper, exists := histogramMap[m2hash] if !exists { histogramWrapper = newTempHistogramWrapper() @@ -511,7 +491,7 @@ func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap } histogram, exists := histogramWrapper.histogramByTs[s.T] if !exists { - histogram = newTempHistogram() + histogram = convertnhcb.NewTempHistogram() } updateHistogram(&histogram, s.F) histogramWrapper.histogramByTs[s.T] = histogram @@ -519,34 +499,6 @@ func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap histogramMap[m2hash] = histogramWrapper } -func processUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) { - sort.Float64s(upperBounds0) - upperBounds := make([]float64, 0, len(upperBounds0)) - prevLE := math.Inf(-1) - for _, le := range upperBounds0 { - if le != prevLE { // deduplicate - upperBounds = append(upperBounds, le) - prevLE = le - } - } - var customBounds []float64 - if upperBounds[len(upperBounds)-1] == math.Inf(1) { - customBounds = upperBounds[:len(upperBounds)-1] - } else { - customBounds = upperBounds - } - return upperBounds, &histogram.FloatHistogram{ - Count: 0, - Sum: 0, - Schema: histogram.CustomBucketsSchema, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: uint32(len(upperBounds))}, - }, - PositiveBuckets: make([]float64, len(upperBounds)), - CustomValues: customBounds, - } -} - // If classic histograms are defined, convert them into native histograms with custom // bounds and append the defined time series to the storage. func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error { @@ -565,16 +517,16 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error { } processClassicHistogramSeries(m, "_bucket", histogramMap, smpls, func(histogramWrapper *tempHistogramWrapper) { histogramWrapper.upperBounds = append(histogramWrapper.upperBounds, le) - }, func(histogram *tempHistogram, f float64) { - histogram.bucketCounts[le] = f + }, func(histogram *convertnhcb.TempHistogram, f float64) { + histogram.BucketCounts[le] = f }) case strings.HasSuffix(mName, "_count"): - processClassicHistogramSeries(m, "_count", histogramMap, smpls, nil, func(histogram *tempHistogram, f float64) { - histogram.count = f + processClassicHistogramSeries(m, "_count", histogramMap, smpls, nil, func(histogram *convertnhcb.TempHistogram, f float64) { + histogram.Count = f }) case strings.HasSuffix(mName, "_sum"): - processClassicHistogramSeries(m, "_sum", histogramMap, smpls, nil, func(histogram *tempHistogram, f float64) { - histogram.sum = f + processClassicHistogramSeries(m, "_sum", histogramMap, smpls, nil, func(histogram *convertnhcb.TempHistogram, f float64) { + histogram.Sum = f }) } } @@ -582,30 +534,14 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error { // Convert the collated classic histogram data into native histograms // with custom bounds and append them to the storage. for _, histogramWrapper := range histogramMap { - upperBounds, fhBase := processUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds) + upperBounds, fhBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds) samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs)) for t, histogram := range histogramWrapper.histogramByTs { - fh := fhBase.Copy() - var prevCount, total float64 - for i, le := range upperBounds { - currCount, exists := histogram.bucketCounts[le] - if !exists { - currCount = 0 - } - count := currCount - prevCount - fh.PositiveBuckets[i] = count - total += count - prevCount = currCount - } - fh.Sum = histogram.sum - if histogram.count != 0 { - total = histogram.count - } - fh.Count = total - s := promql.Sample{T: t, H: fh.Compact(0)} - if err := s.H.Validate(); err != nil { + fh := convertnhcb.ConvertHistogramWrapper(histogram, upperBounds, fhBase) + if err := fh.Validate(); err != nil { return err } + s := promql.Sample{T: t, H: fh} samples = append(samples, s) } sort.Slice(samples, func(i, j int) bool { return samples[i].T < samples[j].T }) diff --git a/scrape/scrape.go b/scrape/scrape.go index a0b681444c..c3005304e3 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -47,6 +47,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/convertnhcb" "github.com/prometheus/prometheus/util/pool" ) @@ -111,6 +112,7 @@ type scrapeLoopOptions struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool + convertClassicHistograms bool mrc []*relabel.Config cache *scrapeCache @@ -178,6 +180,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.interval, opts.timeout, opts.scrapeClassicHistograms, + opts.convertClassicHistograms, options.EnableNativeHistogramsIngestion, options.EnableCreatedTimestampZeroIngestion, options.ExtraMetrics, @@ -440,6 +443,7 @@ func (sp *scrapePool) sync(targets []*Target) { trackTimestampsStaleness = sp.config.TrackTimestampsStaleness mrc = sp.config.MetricRelabelConfigs scrapeClassicHistograms = sp.config.ScrapeClassicHistograms + convertClassicHistograms = sp.config.ConvertClassicHistograms ) sp.targetMtx.Lock() @@ -476,6 +480,7 @@ func (sp *scrapePool) sync(targets []*Target) { interval: interval, timeout: timeout, scrapeClassicHistograms: scrapeClassicHistograms, + convertClassicHistograms: convertClassicHistograms, }) if err != nil { l.setForcedError(err) @@ -828,6 +833,7 @@ type scrapeLoop struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool + convertClassicHistograms bool // Feature flagged options. enableNativeHistogramIngestion bool @@ -881,6 +887,9 @@ type scrapeCache struct { metadata map[string]*metaEntry metrics *scrapeMetrics + + nhcbLabels map[uint64]labels.Labels + nhcbBuilder map[uint64]convertnhcb.TempHistogram } // metaEntry holds meta information about a metric. @@ -904,6 +913,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { seriesPrev: map[uint64]labels.Labels{}, metadata: map[string]*metaEntry{}, metrics: metrics, + nhcbLabels: map[uint64]labels.Labels{}, + nhcbBuilder: map[uint64]convertnhcb.TempHistogram{}, } } @@ -1107,6 +1118,11 @@ func (c *scrapeCache) LengthMetadata() int { return len(c.metadata) } +func (c *scrapeCache) resetNhcb() { + c.nhcbLabels = map[uint64]labels.Labels{} + c.nhcbBuilder = map[uint64]convertnhcb.TempHistogram{} +} + func newScrapeLoop(ctx context.Context, sc scraper, l log.Logger, @@ -1127,6 +1143,7 @@ func newScrapeLoop(ctx context.Context, interval time.Duration, timeout time.Duration, scrapeClassicHistograms bool, + convertClassicHistograms bool, enableNativeHistogramIngestion bool, enableCTZeroIngestion bool, reportExtraMetrics bool, @@ -1180,6 +1197,7 @@ func newScrapeLoop(ctx context.Context, interval: interval, timeout: timeout, scrapeClassicHistograms: scrapeClassicHistograms, + convertClassicHistograms: convertClassicHistograms, enableNativeHistogramIngestion: enableNativeHistogramIngestion, enableCTZeroIngestion: enableCTZeroIngestion, reportExtraMetrics: reportExtraMetrics, @@ -1641,6 +1659,27 @@ loop: } } else { ref, err = app.Append(ref, lset, t, val) + + if sl.convertClassicHistograms { + mName := lset.Get(labels.MetricName) + switch { + case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel): + le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64) + if err == nil && !math.IsNaN(le) { + processClassicHistogramSeries(lset, "_bucket", sl.cache, func(hist *convertnhcb.TempHistogram) { + hist.BucketCounts[le] = val + }) + } + case strings.HasSuffix(mName, "_count"): + processClassicHistogramSeries(lset, "_count", sl.cache, func(hist *convertnhcb.TempHistogram) { + hist.Count = val + }) + case strings.HasSuffix(mName, "_sum"): + processClassicHistogramSeries(lset, "_sum", sl.cache, func(hist *convertnhcb.TempHistogram) { + hist.Sum = val + }) + } + } } } @@ -1762,9 +1801,46 @@ loop: return err == nil }) } + + if sl.convertClassicHistograms { + for hash, th := range sl.cache.nhcbBuilder { + lset, ok := sl.cache.nhcbLabels[hash] + if !ok { + continue + } + ub := make([]float64, 0, len(th.BucketCounts)) + for b := range th.BucketCounts { + ub = append(ub, b) + } + upperBounds, fhBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub) + fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, fhBase) + if err := fh.Validate(); err != nil { + continue + } + // fmt.Printf("FINAL lset: %s, timestamp: %v, val: %v\n", lset, defTime, fh) + _, err = app.AppendHistogram(0, lset, defTime, nil, fh) + if err != nil { + continue + } + } + sl.cache.resetNhcb() + } + return } +func processClassicHistogramSeries(lset labels.Labels, suffix string, cache *scrapeCache, updateHist func(*convertnhcb.TempHistogram)) { + m2 := convertnhcb.GetHistogramMetricBase(lset, suffix) + m2hash := m2.Hash() + cache.nhcbLabels[m2hash] = m2 + th, exists := cache.nhcbBuilder[m2hash] + if !exists { + th = convertnhcb.NewTempHistogram() + } + updateHist(&th) + cache.nhcbBuilder[m2hash] = th +} + // Adds samples to the appender, checking the error, and then returns the # of samples added, // whether the caller should continue to process more samples, and any sample or bucket limit errors. func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index a3fe6ac1a5..bcdb455f7c 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -679,6 +679,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app false, false, false, + false, nil, false, newTestScrapeMetrics(t), @@ -821,6 +822,7 @@ func TestScrapeLoopRun(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, @@ -965,6 +967,7 @@ func TestScrapeLoopMetadata(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, @@ -3366,6 +3369,106 @@ test_summary_count 199 checkValues("quantile", expectedQuantileValues, series) } +// Testing whether we can automatically convert scraped classic histograms into native histograms with custom buckets. +func TestConvertClassicHistograms(t *testing.T) { + simpleStorage := teststorage.New(t) + defer simpleStorage.Close() + + config := &config.ScrapeConfig{ + JobName: "test", + SampleLimit: 100, + Scheme: "http", + ScrapeInterval: model.Duration(100 * time.Millisecond), + ScrapeTimeout: model.Duration(100 * time.Millisecond), + ConvertClassicHistograms: true, + } + + metricsText := ` +# HELP test_histogram This is a histogram with default buckets +# TYPE test_histogram histogram +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.005"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.01"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.025"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.05"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.1"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.25"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="0.5"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="1"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="2.5"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="5"} 0 +test_histogram_bucket{address="0.0.0.0",port="5001",le="10"} 1 +test_histogram_bucket{address="0.0.0.0",port="5001",le="+Inf"} 1 +test_histogram_sum{address="0.0.0.0",port="5001"} 10 +test_histogram_count{address="0.0.0.0",port="5001"} 1 +` + + // The expected "le" values do not have the trailing ".0". + expectedLeValues := []string{"0.005", "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "1", "2.5", "5", "10", "+Inf"} + + scrapeCount := 0 + scraped := make(chan bool) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, metricsText) + scrapeCount++ + if scrapeCount > 2 { + close(scraped) + } + })) + defer ts.Close() + + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + require.NoError(t, err) + defer sp.stop() + + testURL, err := url.Parse(ts.URL) + require.NoError(t, err) + sp.Sync([]*targetgroup.Group{ + { + Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}}, + }, + }) + require.Len(t, sp.ActiveTargets(), 1) + + select { + case <-time.After(5 * time.Second): + t.Fatalf("target was not scraped") + case <-scraped: + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q, err := simpleStorage.Querier(time.Time{}.UnixNano(), time.Now().UnixNano()) + require.NoError(t, err) + defer q.Close() + + checkValues := func(labelName string, expectedValues []string, series storage.SeriesSet) { + foundLeValues := map[string]bool{} + + for series.Next() { + s := series.At() + v := s.Labels().Get(labelName) + require.NotContains(t, foundLeValues, v, "duplicate label value found") + foundLeValues[v] = true + } + + require.Equal(t, len(expectedValues), len(foundLeValues), "number of label values not as expected") + for _, v := range expectedValues { + require.Contains(t, foundLeValues, v, "label value not found") + } + } + + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_histogram_bucket")) + checkValues("le", expectedLeValues, series) + + series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_histogram")) + count := 0 + for series.Next() { + count++ + } + require.Equal(t, 1, count, "number of series not as expected") +} + func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) { appender := &collectResultAppender{} var ( diff --git a/util/convertnhcb/convertnhcb.go b/util/convertnhcb/convertnhcb.go new file mode 100644 index 0000000000..8a96553864 --- /dev/null +++ b/util/convertnhcb/convertnhcb.go @@ -0,0 +1,92 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package convertnhcb + +import ( + "math" + "sort" + "strings" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" +) + +type TempHistogram struct { + BucketCounts map[float64]float64 + Count float64 + Sum float64 +} + +func NewTempHistogram() TempHistogram { + return TempHistogram{ + BucketCounts: map[float64]float64{}, + } +} + +func ProcessUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) { + sort.Float64s(upperBounds0) + upperBounds := make([]float64, 0, len(upperBounds0)) + prevLE := math.Inf(-1) + for _, le := range upperBounds0 { + if le != prevLE { // deduplicate + upperBounds = append(upperBounds, le) + prevLE = le + } + } + var customBounds []float64 + if upperBounds[len(upperBounds)-1] == math.Inf(1) { + customBounds = upperBounds[:len(upperBounds)-1] + } else { + customBounds = upperBounds + } + return upperBounds, &histogram.FloatHistogram{ + Count: 0, + Sum: 0, + Schema: histogram.CustomBucketsSchema, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: uint32(len(upperBounds))}, + }, + PositiveBuckets: make([]float64, len(upperBounds)), + CustomValues: customBounds, + } +} + +func ConvertHistogramWrapper(hist TempHistogram, upperBounds []float64, fhBase *histogram.FloatHistogram) *histogram.FloatHistogram { + fh := fhBase.Copy() + var prevCount, total float64 + for i, le := range upperBounds { + currCount, exists := hist.BucketCounts[le] + if !exists { + currCount = 0 + } + count := currCount - prevCount + fh.PositiveBuckets[i] = count + total += count + prevCount = currCount + } + fh.Sum = hist.Sum + if hist.Count != 0 { + total = hist.Count + } + fh.Count = total + return fh.Compact(0) +} + +func GetHistogramMetricBase(m labels.Labels, suffix string) labels.Labels { + mName := m.Get(labels.MetricName) + return labels.NewBuilder(m). + Set(labels.MetricName, strings.TrimSuffix(mName, suffix)). + Del(labels.BucketLabel). + Labels() +}