diff --git a/docs/feature_flags.md b/docs/feature_flags.md index c3b7c64abc..a354d6bc18 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -114,9 +114,9 @@ Enables the use of start timestamps (ST) in PromQL functions such as `rate()`, ` Enables the synthesis of start timestamps (ST) for cumulative metrics (Counters, Classic Histograms, Native Histograms, and Summaries) when they are not provided by the source. Similar to [the official OpenTelemetry metricstarttimeprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/metricstarttimeprocessor#strategy-subtract-initial-point), it tracks previous values to detect resets and subtracts the initial reference point to synthesize a zero-based timeline from the first sample. > NOTE: -> * Synthesis is currently only implemented on scrape (not yet RW or Otel receive). -> * This feature does not work with out-of-order samples. -> * This leads to counter values being different than the source. The first point is dropped and its timestamp is used as the start timestamp for all subsequent points. All subsequent points are normalized against that dropped point (i.e. subtracted by it). Rates will still return accurate values but raw values of counters will not match the source. +> * Synthesis yields accurate Start Timestamp while maintaining accurate counter rates. However, the raw counter values will be different that what's scraped. This is because the first point is dropped and its timestamp is used as the start timestamp for all subsequent points. All subsequent points are normalized against that dropped point (i.e. subtracted by it). Effectively, synthesis create new counter streams with the known start timestamp from the original data. +> * Synthesis works only with scraped data (RW and Otel receiver are not implemented). +> * Synthesis requires ordered samples. As a result, cumulative samples without ST that are out of order will be rejected despite the `tsdb. out_of_order_time_window` setting. ## Concurrent evaluation of independent rules diff --git a/scrape/scrape_append_v2.go b/scrape/scrape_append_v2.go index b102fef4e1..e723c52744 100644 --- a/scrape/scrape_append_v2.go +++ b/scrape/scrape_append_v2.go @@ -319,6 +319,10 @@ loop: if !skipAppend { // Append sample to the storage. + if stCache != nil { + appOpts.RejectOutOfOrder = true + // TODO(ridwanmsharif): better handle OOO metrics when st-synthesis is on. + } ref, err = app.Append(ref, lset, st, t, val, h, fh, appOpts) if err == nil && ce != nil && ref != 0 { ce.ref = ref diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index a7c7733ddd..4864426cd4 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1916,6 +1916,63 @@ test_metric 32 requireSample(t, got[8], "test_metric", 7, timestamp.FromTime(tsF), timestamp.FromTime(tsE), false) } +func TestScrapeLoopAppend_StartTimeSynthesis_OutOfOrder(t *testing.T) { + ts := time.Now() + + s := teststorage.New(t) + // Simulate OOO error for "test_metric" on the second scrape. + var returnOOO bool + appTest := teststorage.NewAppendable().WithErrs( + func(ls labels.Labels) error { + if returnOOO && ls.Get(model.MetricNameLabel) == "test_metric" { + return storage.ErrOutOfOrderSample + } + return nil + }, nil, nil).Then(s) + + sl, _ := newTestScrapeLoop(t, withAppendable(appTest, true), func(sl *scrapeLoop) { + sl.synthesizeST = true + sl.parseST = true + }) + + // First Scrape: anchor the start time, append is skipped. + scrapeA := []byte(`# TYPE test_metric counter +test_metric 10 +# EOF +`) + + app := sl.appender() + _, _, _, err := app.append(scrapeA, "application/openmetrics-text", ts) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Counter should be skipped (anchored). + got := appTest.ResultSamples() + require.Empty(t, got) + + // Second Scrape: Counter should be rejected due to OOO. + // stCache is now non-nil for this series. + ts2 := ts.Add(time.Second) + scrapeB := []byte(`# TYPE test_metric counter +test_metric 15 +# EOF +`) + + returnOOO = true // Now return OOO error + app = sl.appender() + total, added, seriesAdded, err := app.append(scrapeB, "application/openmetrics-text", ts2) + require.NoError(t, err) // Scrape should not fail + require.NoError(t, app.Commit()) + + // Counter should NOT be in the result samples because it was dropped. + got = appTest.ResultSamples() + require.Empty(t, got) + + require.Equal(t, 1, total) + require.Equal(t, 1, added) + require.Equal(t, 0, seriesAdded) +} + func requireSampleHist(t *testing.T, s teststorage.Sample, name, expectedHist string, ts, st int64, isNaN bool) { t.Helper() require.Equal(t, name, s.L.Get(model.MetricNameLabel))