From 8bcf91c370765ee08450ada2d68c0ec9d60672fd Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Thu, 16 Apr 2026 14:53:17 +0000 Subject: [PATCH] scrape: clear synthesis state on failure and fix histogram copy - Clear stCache state in scrape loop when append fails for existing series. - Copy float histogram before storing in cache to avoid mutation. - Add test for state mutation on OOO failure. - Update docs to reflect behavior on failure. Signed-off-by: Ridwan Sharif --- docs/feature_flags.md | 1 + scrape/scrape_append_v2.go | 6 +++- scrape/scrape_test.go | 66 ++++++++++++++++++++++++++++++++++++++ scrape/st_synthesis.go | 4 +-- 4 files changed, 74 insertions(+), 3 deletions(-) diff --git a/docs/feature_flags.md b/docs/feature_flags.md index ebece7fabc..df596cd2cc 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -118,6 +118,7 @@ Enables the synthesis of start timestamps (ST) for cumulative metrics (Counters, > * Synthesis yields accurate Start Timestamp while maintaining accurate counter rates. However, the raw counter values will be different that what's scraped. This is because the first point is dropped and its timestamp is used as the start timestamp for all subsequent points. All subsequent points are normalized against that dropped point (i.e. subtracted by it). Effectively, synthesis create new counter streams with the known start timestamp from the original data. > * Synthesis works only with scraped data (RW and Otel receiver are not implemented). > * Synthesis requires ordered samples. As a result, cumulative samples without ST that are out of order will be rejected despite the `tsdb. out_of_order_time_window` setting. +> * If an append fails for a series (e.g., due to out-of-order samples being rejected), the synthesis state for that series is cleared. As a result, the next sample received after the failure will be treated as the first sample again and will be dropped to establish a new reference point. ## Concurrent evaluation of independent rules diff --git a/scrape/scrape_append_v2.go b/scrape/scrape_append_v2.go index 3c96549699..30b490ba0e 100644 --- a/scrape/scrape_append_v2.go +++ b/scrape/scrape_append_v2.go @@ -358,7 +358,11 @@ loop: } if ce != nil && sl.synthesizeST { - ce.st = stCache // Set it, even if it's nil (explicit reset). + if shouldCache { + ce.st = stCache // Set it, even if it's nil (explicit reset). + } else if seriesCached { + ce.st = nil // Clear state on failure for existing series. + } } // We track staleness for a series to ensure that if it disappears in a future scrape, diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 50e3130847..06328cc226 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1990,6 +1990,72 @@ test_metric 15 require.Empty(t, got, "Expected no samples (specifically no stale markers) because the series was never tracked for staleness") } +func TestScrapeLoopAppend_StartTimeSynthesis_OOO_StateMutation(t *testing.T) { + ts := time.Now() + + s := teststorage.New(t) + var returnOOO bool + appTest := teststorage.NewAppendable().WithErrs( + func(ls labels.Labels) error { + if returnOOO && ls.Get(model.MetricNameLabel) == "test_metric" { + return storage.ErrOutOfOrderSample + } + return nil + }, nil, nil).Then(s) + + sl, _ := newTestScrapeLoop(t, withAppendable(appTest, true), func(sl *scrapeLoop) { + sl.synthesizeST = true + sl.parseST = true + }) + + // First Scrape: anchor the start time, append is skipped. + scrapeA := []byte(`# TYPE test_metric counter +test_metric 10 +# EOF +`) + + app := sl.appender() + _, _, _, err := app.append(scrapeA, "application/openmetrics-text", ts) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Second Scrape: Counter should be rejected due to OOO. + // We pass a large value to trigger false reset on next scrape if state leaks. + ts2 := ts.Add(time.Second) + scrapeB := []byte(`# TYPE test_metric counter +test_metric 100 +# EOF +`) + + returnOOO = true // Now return OOO error + app = sl.appender() + _, _, _, err = app.append(scrapeB, "application/openmetrics-text", ts2) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Third Scrape: Counter is valid again. + // Since the previous scrape failed with OOO, we cleared the state (ce.st = nil). + // This makes this scrape act like a first sample again, so it will be dropped to establish a new reference point. + ts3 := ts2.Add(time.Second) + scrapeC := []byte(`# TYPE test_metric counter +test_metric 25 +# EOF +`) + + returnOOO = false // No error now + app = sl.appender() + _, _, _, err = app.append(scrapeC, "application/openmetrics-text", ts3) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + got := appTest.ResultSamples() + // We expect 0 samples because: + // - 1st scrape: anchored (dropped) + // - 2nd scrape: OOO (dropped) + // - 3rd scrape: fresh start after cleared state (dropped) + require.Empty(t, got, "Expected no samples because the state was cleared and the sample was used to re-anchor") +} + func requireSampleHist(t *testing.T, s teststorage.Sample, name, expectedHist string, ts, st int64, isNaN bool) { t.Helper() require.Equal(t, name, s.L.Get(model.MetricNameLabel)) diff --git a/scrape/st_synthesis.go b/scrape/st_synthesis.go index 5643415266..8440ba2add 100644 --- a/scrape/st_synthesis.go +++ b/scrape/st_synthesis.go @@ -185,10 +185,10 @@ func (c *stCache) synthesizeFloatHistogram(fh *histogram.FloatHistogram, st int6 return fh, c.st, false } - n.prev = fh + n.prev = fh.Copy() // Subtract the origin anchor. - adjusted, _, _, _ := fh.Copy().Sub(n.starting) + adjusted, _, _, _ := fh.Sub(n.starting) adjusted = adjusted.Compact(0) return adjusted, c.st, false