refactor(scrape): rename sl -> app, slApp -> app

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-12-16 13:59:10 +00:00
parent 10624b9e97
commit 770dc3b69d
2 changed files with 200 additions and 200 deletions

View file

@ -1315,20 +1315,20 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
var total, added, seriesAdded, bytesRead int
var err, appErr, scrapeErr error
sla := sl.appender()
app := sl.appender()
defer func() {
if err != nil {
_ = sla.Rollback()
_ = app.Rollback()
return
}
err = sla.Commit()
err = app.Commit()
if err != nil {
sl.l.Error("Scrape commit failed", "err", err)
}
}()
defer func() {
if err = sl.report(sla, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil {
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil {
sl.l.Warn("Appending scrape report failed", "err", err)
}
}()
@ -1336,9 +1336,9 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
if forcedErr := sl.getForcedError(); forcedErr != nil {
scrapeErr = forcedErr
// Add stale markers.
if _, _, _, err := sla.append([]byte{}, "", appendTime); err != nil {
_ = sla.Rollback()
sla = sl.appender()
if _, _, _, err := app.append([]byte{}, "", appendTime); err != nil {
_ = app.Rollback()
app = sl.appender()
sl.l.Warn("Append failed", "err", err)
}
if errc != nil {
@ -1394,16 +1394,16 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr = sla.append(b, contentType, appendTime)
total, added, seriesAdded, appErr = app.append(b, contentType, appendTime)
if appErr != nil {
_ = sla.Rollback()
sla = sl.appender()
_ = app.Rollback()
app = sl.appender()
sl.l.Debug("Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sla.append([]byte{}, "", appendTime); err != nil {
_ = sla.Rollback()
sla = sl.appender()
if _, _, _, err := app.append([]byte{}, "", appendTime); err != nil {
_ = app.Rollback()
app = sl.appender()
sl.l.Warn("Append failed", "err", err)
}
}
@ -1473,24 +1473,24 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
// sl.context would have been cancelled, hence using sl.appenderCtx.
sla := sl.appender()
app := sl.appender()
var err error
defer func() {
if err != nil {
_ = sla.Rollback()
_ = app.Rollback()
return
}
err = sla.Commit()
err = app.Commit()
if err != nil {
sl.l.Warn("Stale commit failed", "err", err)
}
}()
if _, _, _, err = sla.append([]byte{}, "", staleTime); err != nil {
_ = sla.Rollback()
sla = sl.appender()
if _, _, _, err = app.append([]byte{}, "", staleTime); err != nil {
_ = app.Rollback()
app = sl.appender()
sl.l.Warn("Stale append failed", "err", err)
}
if err = sl.reportStale(sla, staleTime); err != nil {
if err = sl.reportStale(app, staleTime); err != nil {
sl.l.Warn("Stale report failed", "err", err)
}
}
@ -2036,7 +2036,7 @@ var (
}
)
func (sl *scrapeLoop) report(sla scrapeLoopAppendAdapter, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
func (sl *scrapeLoop) report(app scrapeLoopAppendAdapter, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
sl.scraper.Report(start, duration, scrapeErr)
ts := timestamp.FromTime(start)
@ -2047,63 +2047,63 @@ func (sl *scrapeLoop) report(sla scrapeLoopAppendAdapter, start time.Time, durat
}
b := labels.NewBuilderWithSymbolTable(sl.symbolTable)
if err = sla.addReportSample(scrapeHealthMetric, ts, health, b, false); err != nil {
if err = app.addReportSample(scrapeHealthMetric, ts, health, b, false); err != nil {
return err
}
if err = sla.addReportSample(scrapeDurationMetric, ts, duration.Seconds(), b, false); err != nil {
if err = app.addReportSample(scrapeDurationMetric, ts, duration.Seconds(), b, false); err != nil {
return err
}
if err = sla.addReportSample(scrapeSamplesMetric, ts, float64(scraped), b, false); err != nil {
if err = app.addReportSample(scrapeSamplesMetric, ts, float64(scraped), b, false); err != nil {
return err
}
if err = sla.addReportSample(samplesPostRelabelMetric, ts, float64(added), b, false); err != nil {
if err = app.addReportSample(samplesPostRelabelMetric, ts, float64(added), b, false); err != nil {
return err
}
if err = sla.addReportSample(scrapeSeriesAddedMetric, ts, float64(seriesAdded), b, false); err != nil {
if err = app.addReportSample(scrapeSeriesAddedMetric, ts, float64(seriesAdded), b, false); err != nil {
return err
}
if sl.reportExtraMetrics {
if err = sla.addReportSample(scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b, false); err != nil {
if err = app.addReportSample(scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b, false); err != nil {
return err
}
if err = sla.addReportSample(scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b, false); err != nil {
if err = app.addReportSample(scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b, false); err != nil {
return err
}
if err = sla.addReportSample(scrapeBodySizeBytesMetric, ts, float64(bytes), b, false); err != nil {
if err = app.addReportSample(scrapeBodySizeBytesMetric, ts, float64(bytes), b, false); err != nil {
return err
}
}
return err
}
func (sl *scrapeLoop) reportStale(sla scrapeLoopAppendAdapter, start time.Time) (err error) {
func (sl *scrapeLoop) reportStale(app scrapeLoopAppendAdapter, start time.Time) (err error) {
ts := timestamp.FromTime(start)
stale := math.Float64frombits(value.StaleNaN)
b := labels.NewBuilder(labels.EmptyLabels())
if err = sla.addReportSample(scrapeHealthMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeHealthMetric, ts, stale, b, true); err != nil {
return err
}
if err = sla.addReportSample(scrapeDurationMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeDurationMetric, ts, stale, b, true); err != nil {
return err
}
if err = sla.addReportSample(scrapeSamplesMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeSamplesMetric, ts, stale, b, true); err != nil {
return err
}
if err = sla.addReportSample(samplesPostRelabelMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(samplesPostRelabelMetric, ts, stale, b, true); err != nil {
return err
}
if err = sla.addReportSample(scrapeSeriesAddedMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeSeriesAddedMetric, ts, stale, b, true); err != nil {
return err
}
if sl.reportExtraMetrics {
if err = sla.addReportSample(scrapeTimeoutMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeTimeoutMetric, ts, stale, b, true); err != nil {
return err
}
if err = sla.addReportSample(scrapeSampleLimitMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeSampleLimitMetric, ts, stale, b, true); err != nil {
return err
}
if err = sla.addReportSample(scrapeBodySizeBytesMetric, ts, stale, b, true); err != nil {
if err = app.addReportSample(scrapeBodySizeBytesMetric, ts, stale, b, true); err != nil {
return err
}
}

View file

@ -135,17 +135,17 @@ func runScrapeLoopTest(t *testing.T, s *teststorage.TestStorage, expectOutOfOrde
timestampOutOfOrder := now.Add(-5 * time.Minute)
timestampInorder2 := now.Add(5 * time.Minute)
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(`metric_total{a="1",b="1"} 1`), "text/plain", timestampInorder1)
app := sl.appender()
_, _, _, err := app.append([]byte(`metric_total{a="1",b="1"} 1`), "text/plain", timestampInorder1)
require.NoError(t, err)
_, _, _, err = slApp.append([]byte(`metric_total{a="1",b="1"} 2`), "text/plain", timestampOutOfOrder)
_, _, _, err = app.append([]byte(`metric_total{a="1",b="1"} 2`), "text/plain", timestampOutOfOrder)
require.NoError(t, err)
_, _, _, err = slApp.append([]byte(`metric_total{a="1",b="1"} 3`), "text/plain", timestampInorder2)
_, _, _, err = app.append([]byte(`metric_total{a="1",b="1"} 3`), "text/plain", timestampInorder2)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
// Query the samples back from the storage.
q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
@ -219,10 +219,10 @@ test_metric2{foo="bar"} 22
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(scrape1), "application/openmetrics-text", now)
app := sl.appender()
_, _, _, err := app.append([]byte(scrape1), "application/openmetrics-text", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
testutil.RequireEqual(t, []sample{
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}},
@ -231,10 +231,10 @@ test_metric2{foo="bar"} 22
appTest.ResultReset()
// Next (the same) scrape should not new metadata entries.
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(scrape1), "application/openmetrics-text", now.Add(15*time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte(scrape1), "application/openmetrics-text", now.Add(15*time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
testutil.RequireEqual(t, []sample{
{L: labels.FromStrings("__name__", "test_metric_total")},
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar")},
@ -242,10 +242,10 @@ test_metric2{foo="bar"} 22
}, appTest.ResultMetadata())
appTest.ResultReset()
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(scrape2), "application/openmetrics-text", now.Add(15*time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte(scrape2), "application/openmetrics-text", now.Add(15*time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
testutil.RequireEqual(t, []sample{
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "different help text"}}, // Here, technically we should have no unit, but it's a known limitation of the current implementation.
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "metric2", Help: "other help text"}},
@ -257,11 +257,11 @@ test_metric2{foo="bar"} 22
func TestScrapeReportMetadata(t *testing.T) {
appTest := teststorage.NewAppendable()
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
slApp := sl.appender()
app := sl.appender()
now := time.Now()
require.NoError(t, sl.report(slApp, now, 2*time.Second, 1, 1, 1, 512, nil))
require.NoError(t, slApp.Commit())
require.NoError(t, sl.report(app, now, 2*time.Second, 1, 1, 1, 512, nil))
require.NoError(t, app.Commit())
testutil.RequireEqual(t, []sample{
{L: labels.FromStrings("__name__", "up"), M: scrapeHealthMetric.Metadata},
{L: labels.FromStrings("__name__", "scrape_duration_seconds"), M: scrapeDurationMetric.Metadata},
@ -1155,8 +1155,8 @@ func TestScrapeLoopRun_ContextCancelTerminatesBlockedSend(t *testing.T) {
func TestScrapeLoopMetadata(t *testing.T) {
sl, _ := newTestScrapeLoop(t)
slApp := sl.appender()
total, _, _, err := slApp.append([]byte(`# TYPE test_metric counter
app := sl.appender()
total, _, _, err := app.append([]byte(`# TYPE test_metric counter
# HELP test_metric some help text
# UNIT test_metric metric
test_metric_total 1
@ -1164,7 +1164,7 @@ test_metric_total 1
# HELP test_metric_no_type other help text
# EOF`), "application/openmetrics-text", time.Now())
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 1, total)
md, ok := sl.cache.GetMetadata("test_metric")
@ -1189,17 +1189,17 @@ test_metric_total 1
func TestScrapeLoopSeriesAdded(t *testing.T) {
sl, _ := newTestScrapeLoop(t)
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 1, total)
require.Equal(t, 1, added)
require.Equal(t, 1, seriesAdded)
slApp = sl.appender()
total, added, seriesAdded, err = slApp.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, slApp.Commit())
app = sl.appender()
total, added, seriesAdded, err = app.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, app.Commit())
require.NoError(t, err)
require.Equal(t, 1, total)
require.Equal(t, 1, added)
@ -1223,10 +1223,10 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
}
})
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
require.ErrorContains(t, err, "invalid metric name or label names")
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, 1, total)
require.Equal(t, 0, added)
require.Equal(t, 0, seriesAdded)
@ -1237,10 +1237,10 @@ func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
sl.validationScheme = model.LegacyValidation
})
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("{\"test.metric\"} 1\n"), "text/plain", time.Time{})
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("{\"test.metric\"} 1\n"), "text/plain", time.Time{})
require.ErrorContains(t, err, "invalid metric name or label names")
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, 1, total)
require.Equal(t, 0, added)
require.Equal(t, 0, seriesAdded)
@ -1250,8 +1250,8 @@ func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
sl.validationScheme = model.UTF8Validation
})
slApp = sl.appender()
total, added, seriesAdded, err = slApp.append([]byte("{\"test.metric\"} 1\n"), "text/plain", time.Time{})
app = sl.appender()
total, added, seriesAdded, err = app.append([]byte("{\"test.metric\"} 1\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.Equal(t, 1, total)
require.Equal(t, 1, added)
@ -1377,14 +1377,14 @@ func BenchmarkScrapeLoopAppend(b *testing.B) {
b.Cleanup(func() { _ = s.Close() })
sl, _ := newTestScrapeLoop(b, withAppendable(s))
slApp := sl.appender()
app := sl.appender()
ts := time.Time{}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
ts = ts.Add(time.Second)
_, _, _, err := slApp.append(bcase.parsable, bcase.contentType, ts)
_, _, _, err := app.append(bcase.parsable, bcase.contentType, ts)
if err != nil {
b.Fatal(err)
}
@ -1821,10 +1821,10 @@ func TestScrapeLoopAppend(t *testing.T) {
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(test.scrapeLabels), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte(test.scrapeLabels), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
expected := []sample{
{
@ -1914,11 +1914,11 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
}
})
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(tc.exposedLabels), "text/plain", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
app := sl.appender()
_, _, _, err := app.append([]byte(tc.exposedLabels), "text/plain", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
requireEqual(t, []sample{
{
@ -1952,10 +1952,10 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
sl.cache.addRef(metric, fakeRef, lset, hash)
now := time.Now()
slApp := sl.appender()
_, _, _, err = slApp.append(metric, "text/plain", now)
app := sl.appender()
_, _, _, err = app.append(metric, "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
expected := []sample{
{
@ -1996,10 +1996,10 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
beforeMetricValue := beforeMetric.GetCounter().GetValue()
now := time.Now()
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "text/plain", now)
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "text/plain", now)
require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 1, seriesAdded)
@ -2025,10 +2025,10 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
requireEqual(t, want, appTest.RolledbackSamples(), "Appended samples not as expected:\n%s", appTest)
now = time.Now()
slApp = sl.appender()
total, added, seriesAdded, err = slApp.append([]byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "text/plain", now)
app = sl.appender()
total, added, seriesAdded, err = app.append([]byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "text/plain", now)
require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, 9, total)
require.Equal(t, 6, added)
require.Equal(t, 1, seriesAdded)
@ -2047,7 +2047,7 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
return l
}
})
slApp := sl.appender()
app := sl.appender()
metric := dto.Metric{}
err := sl.metrics.targetScrapeNativeHistogramBucketLimit.Write(&metric)
@ -2082,7 +2082,7 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
require.NoError(t, err)
now := time.Now()
total, added, seriesAdded, err := slApp.append(msg, "application/vnd.google.protobuf", now)
total, added, seriesAdded, err := app.append(msg, "application/vnd.google.protobuf", now)
require.NoError(t, err)
require.Equal(t, 3, total)
require.Equal(t, 3, added)
@ -2105,7 +2105,7 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
require.NoError(t, err)
now = time.Now()
total, added, seriesAdded, err = slApp.append(msg, "application/vnd.google.protobuf", now)
total, added, seriesAdded, err = app.append(msg, "application/vnd.google.protobuf", now)
require.NoError(t, err)
require.Equal(t, 3, total)
require.Equal(t, 3, added)
@ -2128,11 +2128,11 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
require.NoError(t, err)
now = time.Now()
total, added, seriesAdded, err = slApp.append(msg, "application/vnd.google.protobuf", now)
total, added, seriesAdded, err = app.append(msg, "application/vnd.google.protobuf", now)
if !errors.Is(err, errBucketLimit) {
t.Fatalf("Did not see expected histogram bucket limit error: %s", err)
}
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded) // Series are cached.
@ -2154,15 +2154,15 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(`metric_a{a="1",b="1"} 1`), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte(`metric_a{a="1",b="1"} 1`), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(`metric_a{b="1",a="1"} 2`), "text/plain", now.Add(time.Minute))
app = sl.appender()
_, _, _, err = app.append([]byte(`metric_a{b="1",a="1"} 2`), "text/plain", now.Add(time.Minute))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -2186,8 +2186,8 @@ func TestScrapeLoopAppendFailsWithNoContentType(t *testing.T) {
})
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("metric_a 1\n"), "", now)
app := sl.appender()
_, _, _, err := app.append([]byte("metric_a 1\n"), "", now)
// We expected the appropriate error.
require.ErrorContains(t, err, "non-compliant scrape target sending blank Content-Type and no fallback_scrape_protocol specified for target", "Expected \"non-compliant scrape\" error but got: %s", err)
}
@ -2200,10 +2200,10 @@ func TestScrapeLoopAppendEmptyWithNoContentType(t *testing.T) {
})
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(""), "", now)
app := sl.appender()
_, _, _, err := app.append([]byte(""), "", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
}
func TestScrapeLoopAppendStaleness(t *testing.T) {
@ -2211,15 +2211,15 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("metric_a 1\n"), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte("metric_a 1\n"), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(""), "", now.Add(time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte(""), "", now.Add(time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -2240,15 +2240,15 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
appTest := teststorage.NewAppendable()
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("metric_a 1 1000\n"), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte("metric_a 1 1000\n"), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(""), "", now.Add(time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte(""), "", now.Add(time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -2268,15 +2268,15 @@ func TestScrapeLoopAppendStalenessIfTrackTimestampStaleness(t *testing.T) {
})
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("metric_a 1 1000\n"), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte("metric_a 1 1000\n"), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(""), "", now.Add(time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte(""), "", now.Add(time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -2809,7 +2809,7 @@ metric: <
// expectations.
sl.appendMetadataToWAL = false
})
slApp := sl.appender()
app := sl.appender()
now := time.Now()
@ -2834,9 +2834,9 @@ metric: <
buf.WriteString(test.scrapeText)
}
_, _, _, err := slApp.append(buf.Bytes(), test.contentType, now)
_, _, _, err := app.append(buf.Bytes(), test.contentType, now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
requireEqual(t, test.samples, appTest.ResultSamples())
})
}
@ -2904,10 +2904,10 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
}
for i, st := range scrapeText {
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(st), "application/openmetrics-text", timestamp.Time(samples[i].T))
app := sl.appender()
_, _, _, err := app.append([]byte(st), "application/openmetrics-text", timestamp.Time(samples[i].T))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
}
requireEqual(t, samples, appTest.ResultSamples())
@ -2963,10 +2963,10 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Unix(1, 0)
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "text/plain", now)
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -2992,10 +2992,10 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
))
now := time.Now().Add(20 * time.Minute)
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("normal 1\n"), "text/plain", now)
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("normal 1\n"), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 1, total)
require.Equal(t, 1, added)
require.Equal(t, 0, seriesAdded)
@ -3418,10 +3418,10 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(`metric_a{a="1",b="1"} 1 0`), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte(`metric_a{a="1",b="1"} 1 0`), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -3444,10 +3444,10 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
})
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(`metric_a{a="1",b="1"} 1 0`), "text/plain", now)
app := sl.appender()
_, _, _, err := app.append([]byte(`metric_a{a="1",b="1"} 1 0`), "text/plain", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -3467,10 +3467,10 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
// We add a good and a bad metric to check that both are discarded.
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "text/plain", time.Time{})
app := sl.appender()
_, _, _, err := app.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "text/plain", time.Time{})
require.Error(t, err)
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
// We need to cycle staleness cache maps after a manual rollback. Otherwise, they will have old entries in them,
// which would cause ErrDuplicateSampleForTimestamp errors on the next append.
sl.cache.iterDone(true)
@ -3482,10 +3482,10 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
require.NoError(t, series.Err())
// We add a good metric to check that it is recorded.
slApp = sl.appender()
_, _, _, err = slApp.append([]byte("test_metric{le=\"500\"} 1\n"), "text/plain", time.Time{})
app = sl.appender()
_, _, _, err = app.append([]byte("test_metric{le=\"500\"} 1\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
q, err = s.Querier(time.Time{}.UnixNano(), 0)
require.NoError(t, err)
@ -3510,10 +3510,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
}
})
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "text/plain", time.Time{})
app := sl.appender()
_, _, _, err := app.append([]byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "text/plain", time.Time{})
require.Error(t, err)
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, errNameLabelMandatory, err)
q, err := s.Querier(time.Time{}.UnixNano(), 0)
@ -3780,10 +3780,10 @@ func TestScrapeAddFast(t *testing.T) {
sl, _ := newTestScrapeLoop(t, withAppendable(s))
slApp := sl.appender()
_, _, _, err := slApp.append([]byte("up 1\n"), "text/plain", time.Time{})
app := sl.appender()
_, _, _, err := app.append([]byte("up 1\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
// Poison the cache. There is just one entry, and one series in the
// storage. Changing the ref will create a 'not found' error.
@ -3791,10 +3791,10 @@ func TestScrapeAddFast(t *testing.T) {
v.ref++
}
slApp = sl.appender()
_, _, _, err = slApp.append([]byte("up 1\n"), "text/plain", time.Time{}.Add(time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte("up 1\n"), "text/plain", time.Time{}.Add(time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
}
func TestReuseCacheRace(t *testing.T) {
@ -4080,15 +4080,15 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
sl.labelLimits = &test.labelLimits
})
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(test.scrapeLabels), "text/plain", time.Now())
app := sl.appender()
_, _, _, err := app.append([]byte(test.scrapeLabels), "text/plain", time.Now())
t.Logf("Test:%s", test.title)
if test.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
}
}
}
@ -4690,10 +4690,10 @@ metric: <
t.Error("unexpected content type")
}
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append(content, contentType, now)
app := sl.appender()
_, _, _, err := app.append(content, contentType, now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
var expectedSchema int32
if expectCustomBuckets {
@ -5087,29 +5087,29 @@ func BenchmarkTargetScraperGzip(b *testing.B) {
func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
sl, _ := newTestScrapeLoop(t)
slApp := sl.appender()
total, added, seriesAdded, err := slApp.append([]byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "text/plain", time.Time{})
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 1, seriesAdded)
require.Equal(t, 2.0, prom_testutil.ToFloat64(sl.metrics.targetScrapeSampleDuplicate))
slApp = sl.appender()
total, added, seriesAdded, err = slApp.append([]byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "text/plain", time.Time{})
app = sl.appender()
total, added, seriesAdded, err = app.append([]byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
require.Equal(t, 4.0, prom_testutil.ToFloat64(sl.metrics.targetScrapeSampleDuplicate))
// When different timestamps are supplied, multiple samples are accepted.
slApp = sl.appender()
total, added, seriesAdded, err = slApp.append([]byte("test_metric 1 1001\ntest_metric 1 1002\ntest_metric 1 1003\n"), "text/plain", time.Time{})
app = sl.appender()
total, added, seriesAdded, err = app.append([]byte("test_metric 1 1001\ntest_metric 1 1002\ntest_metric 1 1003\n"), "text/plain", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
@ -5492,21 +5492,21 @@ func TestScrapeAppendWithParseError(t *testing.T) {
sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
now := time.Now()
slApp := sl.appender()
_, _, _, err := slApp.append([]byte(scrape1), "application/openmetrics-text", now)
app := sl.appender()
_, _, _, err := app.append([]byte(scrape1), "application/openmetrics-text", now)
require.Error(t, err)
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
slApp = sl.appender()
_, _, _, err = slApp.append(nil, "application/openmetrics-text", now)
app = sl.appender()
_, _, _, err = app.append(nil, "application/openmetrics-text", now)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Empty(t, appTest.ResultSamples())
slApp = sl.appender()
_, _, _, err = slApp.append([]byte(scrape2), "application/openmetrics-text", now.Add(15*time.Second))
app = sl.appender()
_, _, _, err = app.append([]byte(scrape2), "application/openmetrics-text", now.Add(15*time.Second))
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
want := []sample{
{
@ -5530,15 +5530,15 @@ func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) {
})
now := time.Now()
slApp := sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err := slApp.append(
app := sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err := app.append(
// Start with 3 samples, all accepted.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 3, samplesScraped) // All on scrape.
require.Equal(t, 3, samplesAfterRelabel) // This is series after relabeling.
require.Equal(t, 3, createdSeries) // Newly added to TSDB.
@ -5559,34 +5559,34 @@ func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) {
V: 1,
},
}
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", slApp)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", app)
now = now.Add(time.Minute)
slApp = sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err = slApp.append(
app = sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err = app.append(
// Start exporting 3 more samples, so we're over the limit now.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\n"),
"text/plain",
now,
)
require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback())
require.NoError(t, app.Rollback())
require.Equal(t, 6, samplesScraped)
require.Equal(t, 6, samplesAfterRelabel)
require.Equal(t, 1, createdSeries) // We've added one series before hitting the limit.
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", slApp)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", app)
sl.cache.iterDone(false)
now = now.Add(time.Minute)
slApp = sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err = slApp.append(
app = sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err = app.append(
// Remove all samples except first 2.
[]byte("metric_a 1\nmetric_b 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 2, samplesScraped)
require.Equal(t, 2, samplesAfterRelabel)
require.Equal(t, 0, createdSeries)
@ -5618,7 +5618,7 @@ func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) {
V: math.Float64frombits(value.StaleNaN),
},
}...)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", slApp)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", app)
}
// This test covers a case where there's a target with sample_limit set and each scrape sees a completely
@ -5633,15 +5633,15 @@ func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) {
})
now := time.Now()
slApp := sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err := slApp.append(
app := sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err := app.append(
// Start with 4 samples, all accepted.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 4, samplesScraped) // All on scrape.
require.Equal(t, 4, samplesAfterRelabel) // This is series after relabeling.
require.Equal(t, 4, createdSeries) // Newly added to TSDB.
@ -5667,18 +5667,18 @@ func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) {
V: 1,
},
}
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", slApp)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", app)
now = now.Add(time.Minute)
slApp = sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err = slApp.append(
app = sl.appender()
samplesScraped, samplesAfterRelabel, createdSeries, err = app.append(
// Replace all samples with new time series.
[]byte("metric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.NoError(t, app.Commit())
require.Equal(t, 4, samplesScraped)
require.Equal(t, 4, samplesAfterRelabel)
require.Equal(t, 4, createdSeries)
@ -5728,7 +5728,7 @@ func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) {
V: math.Float64frombits(value.StaleNaN),
},
}...)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", slApp)
requireEqual(t, want, appTest.ResultSamples(), "Appended samples not as expected:\n%s", app)
}
func TestScrapeLoopDisableStalenessMarkerInjection(t *testing.T) {