From 8d9dfa075d2c0b6de22d8cb7288b0161802f9061 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 19 Jun 2025 07:53:46 +1000 Subject: [PATCH] promql: reuse `histogramStatsIterator` where possible, and expose it for other implementations to use (#16686) * Expose type * Add `Reset` method --------- Signed-off-by: Charles Korn --- promql/engine.go | 6 ++ promql/histogram_stats_iterator.go | 48 +++++++----- promql/histogram_stats_iterator_test.go | 97 +++++++++++++++++-------- 3 files changed, 103 insertions(+), 48 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 06dcd06bda..0622bcb388 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3931,6 +3931,12 @@ func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries { } func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + // Try to reuse the iterator if we can. + if statsIterator, ok := it.(*HistogramStatsIterator); ok { + statsIterator.Reset(s.Series.Iterator(statsIterator.Iterator)) + return statsIterator + } + return NewHistogramStatsIterator(s.Series.Iterator(it)) } diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go index 59814d1b35..cbc717cac0 100644 --- a/promql/histogram_stats_iterator.go +++ b/promql/histogram_stats_iterator.go @@ -19,7 +19,11 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) -type histogramStatsIterator struct { +// HistogramStatsIterator is an iterator that returns histogram objects +// which have only their sum and count values populated. The iterator handles +// counter reset detection internally and sets the counter reset hint accordingly +// in each returned histogram object. +type HistogramStatsIterator struct { chunkenc.Iterator currentH *histogram.Histogram @@ -27,24 +31,30 @@ type histogramStatsIterator struct { currentFH *histogram.FloatHistogram lastFH *histogram.FloatHistogram + + currentSeriesRead bool } -// NewHistogramStatsIterator creates an iterator which returns histogram objects -// which have only their sum and count values populated. The iterator handles -// counter reset detection internally and sets the counter reset hint accordingly -// in each returned histogram objects. -func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator { - return &histogramStatsIterator{ +// NewHistogramStatsIterator creates a new HistogramStatsIterator. +func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator { + return &HistogramStatsIterator{ Iterator: it, currentH: &histogram.Histogram{}, currentFH: &histogram.FloatHistogram{}, } } +// Reset resets this iterator for use with a new underlying iterator, reusing +// objects already allocated where possible. +func (f *HistogramStatsIterator) Reset(it chunkenc.Iterator) { + f.Iterator = it + f.currentSeriesRead = false +} + // AtHistogram returns the next timestamp/histogram pair. The counter reset // detection is guaranteed to be correct only when the caller does not switch // between AtHistogram and AtFloatHistogram calls. -func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { +func (f *HistogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { var t int64 t, f.currentH = f.Iterator.AtHistogram(f.currentH) if value.IsStaleNaN(f.currentH.Sum) { @@ -76,7 +86,7 @@ func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *hi // AtFloatHistogram returns the next timestamp/float histogram pair. The counter // reset detection is guaranteed to be correct only when the caller does not // switch between AtHistogram and AtFloatHistogram calls. -func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { +func (f *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { var t int64 t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH) if value.IsStaleNaN(f.currentFH.Sum) { @@ -104,31 +114,35 @@ func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) return t, fh } -func (f *histogramStatsIterator) setLastH(h *histogram.Histogram) { +func (f *HistogramStatsIterator) setLastH(h *histogram.Histogram) { f.lastFH = nil if f.lastH == nil { f.lastH = h.Copy() } else { h.CopyTo(f.lastH) } + + f.currentSeriesRead = true } -func (f *histogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { +func (f *HistogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { f.lastH = nil if f.lastFH == nil { f.lastFH = fh.Copy() } else { fh.CopyTo(f.lastFH) } + + f.currentSeriesRead = true } -func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { +func (f *HistogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { if hint != histogram.UnknownCounterReset { return hint } prevFH := f.lastFH - if prevFH == nil { - if f.lastH == nil { + if prevFH == nil || !f.currentSeriesRead { + if f.lastH == nil || !f.currentSeriesRead { // We don't know if there's a counter reset. return histogram.UnknownCounterReset } @@ -140,13 +154,13 @@ func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHi return histogram.NotCounterReset } -func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { +func (f *HistogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { if h.CounterResetHint != histogram.UnknownCounterReset { return h.CounterResetHint } var prevFH *histogram.FloatHistogram - if f.lastH == nil { - if f.lastFH == nil { + if f.lastH == nil || !f.currentSeriesRead { + if f.lastFH == nil || !f.currentSeriesRead { // We don't know if there's a counter reset. return histogram.UnknownCounterReset } diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go index abe3436a5a..cc570c9730 100644 --- a/promql/histogram_stats_iterator_test.go +++ b/promql/histogram_stats_iterator_test.go @@ -33,7 +33,7 @@ func TestHistogramStatsDecoding(t *testing.T) { expectedHints []histogram.CounterResetHint }{ { - name: "unknown counter reset triggers detection", + name: "unknown counter reset for later sample triggers detection", histograms: []*histogram.Histogram{ tsdbutil.GenerateTestHistogramWithHint(0, histogram.NotCounterReset), tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset), @@ -47,6 +47,21 @@ func TestHistogramStatsDecoding(t *testing.T) { histogram.NotCounterReset, }, }, + { + name: "unknown counter reset for first sample does not trigger detection", + histograms: []*histogram.Histogram{ + tsdbutil.GenerateTestHistogramWithHint(0, histogram.UnknownCounterReset), + tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset), + tsdbutil.GenerateTestHistogramWithHint(2, histogram.CounterReset), + tsdbutil.GenerateTestHistogramWithHint(2, histogram.UnknownCounterReset), + }, + expectedHints: []histogram.CounterResetHint{ + histogram.UnknownCounterReset, + histogram.NotCounterReset, + histogram.CounterReset, + histogram.NotCounterReset, + }, + }, { name: "stale sample before unknown reset hint", histograms: []*histogram.Histogram{ @@ -100,42 +115,62 @@ func TestHistogramStatsDecoding(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { t.Run("histogram_stats", func(t *testing.T) { - decodedStats := make([]*histogram.Histogram, 0) - statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) - for statsIterator.Next() != chunkenc.ValNone { - _, h := statsIterator.AtHistogram(nil) - decodedStats = append(decodedStats, h) - } - for i := 0; i < len(tc.histograms); i++ { - require.Equalf(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, "mismatch in counter reset hint for histogram %d", i) - h := tc.histograms[i] - if value.IsStaleNaN(h.Sum) { - require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) - require.Equal(t, uint64(0), decodedStats[i].Count) - } else { - require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count) - require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum) + check := func(statsIterator *HistogramStatsIterator) { + decodedStats := make([]*histogram.Histogram, 0) + + for statsIterator.Next() != chunkenc.ValNone { + _, h := statsIterator.AtHistogram(nil) + decodedStats = append(decodedStats, h) + } + + for i := 0; i < len(tc.histograms); i++ { + require.Equalf(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, "mismatch in counter reset hint for histogram %d", i) + h := tc.histograms[i] + if value.IsStaleNaN(h.Sum) { + require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) + require.Equal(t, uint64(0), decodedStats[i].Count) + } else { + require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count) + require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum) + } } } + + // Check that we get the expected results with a fresh iterator. + statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) + check(statsIterator) + + // Check that we get the same results if we reset and reuse that iterator. + statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil)) + check(statsIterator) }) t.Run("float_histogram_stats", func(t *testing.T) { - decodedStats := make([]*histogram.FloatHistogram, 0) - statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) - for statsIterator.Next() != chunkenc.ValNone { - _, h := statsIterator.AtFloatHistogram(nil) - decodedStats = append(decodedStats, h) - } - for i := 0; i < len(tc.histograms); i++ { - require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) - fh := tc.histograms[i].ToFloat(nil) - if value.IsStaleNaN(fh.Sum) { - require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) - require.Equal(t, float64(0), decodedStats[i].Count) - } else { - require.Equal(t, fh.Count, decodedStats[i].Count) - require.Equal(t, fh.Sum, decodedStats[i].Sum) + check := func(statsIterator *HistogramStatsIterator) { + decodedStats := make([]*histogram.FloatHistogram, 0) + for statsIterator.Next() != chunkenc.ValNone { + _, h := statsIterator.AtFloatHistogram(nil) + decodedStats = append(decodedStats, h) + } + for i := 0; i < len(tc.histograms); i++ { + require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) + fh := tc.histograms[i].ToFloat(nil) + if value.IsStaleNaN(fh.Sum) { + require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) + require.Equal(t, float64(0), decodedStats[i].Count) + } else { + require.Equal(t, fh.Count, decodedStats[i].Count) + require.Equal(t, fh.Sum, decodedStats[i].Sum) + } } } + + // Check that we get the expected results with a fresh iterator. + statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) + check(statsIterator) + + // Check that we get the same results if we reset and reuse that iterator. + statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil)) + check(statsIterator) }) }) }