diff --git a/scrape/scrape.go b/scrape/scrape.go index aa2d5538b1..9bff47eebc 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1512,13 +1512,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, loop: for { var ( - et textparse.Entry - sampleAdded, isHistogram bool - met []byte - parsedTimestamp *int64 - val float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + et textparse.Entry + sampleAdded, isHistogram, seriesAlreadyScraped bool + met []byte + parsedTimestamp *int64 + val float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) if et, err = p.Next(); err != nil { if errors.Is(err, io.EOF) { @@ -1573,6 +1573,7 @@ loop: if ok { ref = ce.ref lset = ce.lset + hash = ce.hash // Update metadata only if it changed in the current iteration. updateMetadata(lset, false) @@ -1609,24 +1610,30 @@ loop: updateMetadata(lset, true) } - if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { - ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) - if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. - // CT is an experimental feature. For now, we don't need to fail the - // scrape on errors updating the created timestamp, log debug. - level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + _, seriesAlreadyScraped = sl.cache.seriesCur[hash] + if seriesAlreadyScraped { + err = storage.ErrDuplicateSampleForTimestamp + } else { + if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { + ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. + // CT is an experimental feature. For now, we don't need to fail the + // scrape on errors updating the created timestamp, log debug. + level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + } + } + + if isHistogram { + if h != nil { + ref, err = app.AppendHistogram(ref, lset, t, h, nil) + } else { + ref, err = app.AppendHistogram(ref, lset, t, nil, fh) + } + } else { + ref, err = app.Append(ref, lset, t, val) } } - if isHistogram { - if h != nil { - ref, err = app.AppendHistogram(ref, lset, t, h, nil) - } else { - ref, err = app.AppendHistogram(ref, lset, t, nil, fh) - } - } else { - ref, err = app.Append(ref, lset, t, val) - } sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs) if err != nil { if !errors.Is(err, storage.ErrNotFound) { @@ -1648,6 +1655,8 @@ loop: // Increment added even if there's an error so we correctly report the // number of samples remaining after relabeling. + // We still report duplicated samples here since this number should be the exact number + // of time series exposed on a scrape after relabelling. added++ exemplars = exemplars[:0] // Reset and reuse the exemplar slice. for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index e37d091aec..4732fbe0d0 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -3600,3 +3600,31 @@ func BenchmarkTargetScraperGzip(b *testing.B) { }) } } + +// When a scrape contains multiple instances for the same time series we should increment +// prometheus_target_scrapes_sample_duplicate_timestamp_total metric. +func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) { + ctx, sl := simpleTestScrapeLoop(t) + + slApp := sl.appender(ctx) + total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{}) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, total) + require.Equal(t, 3, added) + require.Equal(t, 1, seriesAdded) + + slApp = sl.appender(ctx) + total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{}) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, total) + require.Equal(t, 3, added) + require.Equal(t, 0, seriesAdded) + + metric := dto.Metric{} + err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric) + require.NoError(t, err) + value := metric.GetCounter().GetValue() + require.Equal(t, 4.0, value) +}