mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-10 17:10:12 -04:00
scrape: clear synthesis state on failure and fix histogram copy
- Clear stCache state in scrape loop when append fails for existing series. - Copy float histogram before storing in cache to avoid mutation. - Add test for state mutation on OOO failure. - Update docs to reflect behavior on failure. Signed-off-by: Ridwan Sharif <ridwanmsharif@google.com>
This commit is contained in:
parent
abf3726f8a
commit
8bcf91c370
4 changed files with 74 additions and 3 deletions
|
|
@ -118,6 +118,7 @@ Enables the synthesis of start timestamps (ST) for cumulative metrics (Counters,
|
|||
> * Synthesis yields accurate Start Timestamp while maintaining accurate counter rates. However, the raw counter values will be different that what's scraped. This is because the first point is dropped and its timestamp is used as the start timestamp for all subsequent points. All subsequent points are normalized against that dropped point (i.e. subtracted by it). Effectively, synthesis create new counter streams with the known start timestamp from the original data.
|
||||
> * Synthesis works only with scraped data (RW and Otel receiver are not implemented).
|
||||
> * Synthesis requires ordered samples. As a result, cumulative samples without ST that are out of order will be rejected despite the `tsdb. out_of_order_time_window` setting.
|
||||
> * If an append fails for a series (e.g., due to out-of-order samples being rejected), the synthesis state for that series is cleared. As a result, the next sample received after the failure will be treated as the first sample again and will be dropped to establish a new reference point.
|
||||
|
||||
## Concurrent evaluation of independent rules
|
||||
|
||||
|
|
|
|||
|
|
@ -358,7 +358,11 @@ loop:
|
|||
}
|
||||
|
||||
if ce != nil && sl.synthesizeST {
|
||||
ce.st = stCache // Set it, even if it's nil (explicit reset).
|
||||
if shouldCache {
|
||||
ce.st = stCache // Set it, even if it's nil (explicit reset).
|
||||
} else if seriesCached {
|
||||
ce.st = nil // Clear state on failure for existing series.
|
||||
}
|
||||
}
|
||||
|
||||
// We track staleness for a series to ensure that if it disappears in a future scrape,
|
||||
|
|
|
|||
|
|
@ -1990,6 +1990,72 @@ test_metric 15
|
|||
require.Empty(t, got, "Expected no samples (specifically no stale markers) because the series was never tracked for staleness")
|
||||
}
|
||||
|
||||
func TestScrapeLoopAppend_StartTimeSynthesis_OOO_StateMutation(t *testing.T) {
|
||||
ts := time.Now()
|
||||
|
||||
s := teststorage.New(t)
|
||||
var returnOOO bool
|
||||
appTest := teststorage.NewAppendable().WithErrs(
|
||||
func(ls labels.Labels) error {
|
||||
if returnOOO && ls.Get(model.MetricNameLabel) == "test_metric" {
|
||||
return storage.ErrOutOfOrderSample
|
||||
}
|
||||
return nil
|
||||
}, nil, nil).Then(s)
|
||||
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(appTest, true), func(sl *scrapeLoop) {
|
||||
sl.synthesizeST = true
|
||||
sl.parseST = true
|
||||
})
|
||||
|
||||
// First Scrape: anchor the start time, append is skipped.
|
||||
scrapeA := []byte(`# TYPE test_metric counter
|
||||
test_metric 10
|
||||
# EOF
|
||||
`)
|
||||
|
||||
app := sl.appender()
|
||||
_, _, _, err := app.append(scrapeA, "application/openmetrics-text", ts)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Second Scrape: Counter should be rejected due to OOO.
|
||||
// We pass a large value to trigger false reset on next scrape if state leaks.
|
||||
ts2 := ts.Add(time.Second)
|
||||
scrapeB := []byte(`# TYPE test_metric counter
|
||||
test_metric 100
|
||||
# EOF
|
||||
`)
|
||||
|
||||
returnOOO = true // Now return OOO error
|
||||
app = sl.appender()
|
||||
_, _, _, err = app.append(scrapeB, "application/openmetrics-text", ts2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Third Scrape: Counter is valid again.
|
||||
// Since the previous scrape failed with OOO, we cleared the state (ce.st = nil).
|
||||
// This makes this scrape act like a first sample again, so it will be dropped to establish a new reference point.
|
||||
ts3 := ts2.Add(time.Second)
|
||||
scrapeC := []byte(`# TYPE test_metric counter
|
||||
test_metric 25
|
||||
# EOF
|
||||
`)
|
||||
|
||||
returnOOO = false // No error now
|
||||
app = sl.appender()
|
||||
_, _, _, err = app.append(scrapeC, "application/openmetrics-text", ts3)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
got := appTest.ResultSamples()
|
||||
// We expect 0 samples because:
|
||||
// - 1st scrape: anchored (dropped)
|
||||
// - 2nd scrape: OOO (dropped)
|
||||
// - 3rd scrape: fresh start after cleared state (dropped)
|
||||
require.Empty(t, got, "Expected no samples because the state was cleared and the sample was used to re-anchor")
|
||||
}
|
||||
|
||||
func requireSampleHist(t *testing.T, s teststorage.Sample, name, expectedHist string, ts, st int64, isNaN bool) {
|
||||
t.Helper()
|
||||
require.Equal(t, name, s.L.Get(model.MetricNameLabel))
|
||||
|
|
|
|||
|
|
@ -185,10 +185,10 @@ func (c *stCache) synthesizeFloatHistogram(fh *histogram.FloatHistogram, st int6
|
|||
return fh, c.st, false
|
||||
}
|
||||
|
||||
n.prev = fh
|
||||
n.prev = fh.Copy()
|
||||
|
||||
// Subtract the origin anchor.
|
||||
adjusted, _, _, _ := fh.Copy().Sub(n.starting)
|
||||
adjusted, _, _, _ := fh.Sub(n.starting)
|
||||
adjusted = adjusted.Compact(0)
|
||||
|
||||
return adjusted, c.st, false
|
||||
|
|
|
|||
Loading…
Reference in a new issue