From 06b9df65ec782df6e7c0be4bb3e3d4c16a255621 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 13 Dec 2016 16:18:17 +0000 Subject: [PATCH] Refactor and add unittests to scrape result handling. --- retrieval/scrape.go | 31 ++++++----- retrieval/scrape_test.go | 112 +++++++++++++++++++++++++++++++++++++++ retrieval/target.go | 2 +- 3 files changed, 132 insertions(+), 13 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index ad608a1058..76c8631ea3 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -427,21 +427,11 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { } samples, err := sl.scraper.scrape(scrapeCtx, start) - if err == nil { - // Collect samples post-relabelling and label handling in a buffer. - buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} - app := sl.mutator(buf) - for _, sample := range samples { - app.Append(sample) - } - - // Send samples to storage. - sl.append(buf.buffer) - } else if errc != nil { + err = sl.processScrapeResult(samples, err, start) + if err != nil && errc != nil { errc <- err } - sl.report(start, time.Since(start), len(samples), err) last = start } else { targetSkippedScrapes.WithLabelValues(interval.String()).Inc() @@ -460,6 +450,23 @@ func (sl *scrapeLoop) stop() { <-sl.done } +func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error, start time.Time) error { + if scrapeErr == nil { + // Collect samples post-relabelling and label handling in a buffer. + buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} + app := sl.mutator(buf) + for _, sample := range samples { + app.Append(sample) + } + + // Send samples to storage. + sl.append(buf.buffer) + } + + sl.report(start, time.Since(start), len(samples), scrapeErr) + return scrapeErr +} + func (sl *scrapeLoop) append(samples model.Samples) { var ( numOutOfOrder = 0 diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index db1a1bbc99..6134d52af7 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -299,6 +299,118 @@ func TestScrapePoolSampleAppender(t *testing.T) { } } +func TestScrapeLoopSampleProcessing(t *testing.T) { + readSamples := model.Samples{ + { + Metric: model.Metric{"__name__": "a_metric"}, + }, + { + Metric: model.Metric{"__name__": "b_metric"}, + }, + } + + testCases := []struct { + scrapedSamples model.Samples + scrapeError error + metricRelabelConfigs []*config.RelabelConfig + expectedReportedSamples model.Samples + expectedIngestedSamplesCount int + }{ + { + scrapedSamples: readSamples, + scrapeError: nil, + metricRelabelConfigs: []*config.RelabelConfig{}, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 1, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + }, + expectedIngestedSamplesCount: 2, + }, + { + scrapedSamples: readSamples, + scrapeError: nil, + metricRelabelConfigs: []*config.RelabelConfig{ + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("a.*"), + }, + }, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 1, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 2, + }, + }, + expectedIngestedSamplesCount: 1, + }, + { + scrapedSamples: model.Samples{}, + scrapeError: fmt.Errorf("error"), + metricRelabelConfigs: []*config.RelabelConfig{}, + expectedReportedSamples: model.Samples{ + { + Metric: model.Metric{"__name__": "up"}, + Value: 0, + }, + { + Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + }, + { + Metric: model.Metric{"__name__": "scrape_samples_scraped"}, + Value: 0, + }, + }, + expectedIngestedSamplesCount: 0, + }, + } + + for _, test := range testCases { + ingestedSamples := &bufferAppender{buffer: model.Samples{}} + reportedSamples := &bufferAppender{buffer: model.Samples{}} + + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: test.metricRelabelConfigs, + } + + sp := newScrapePool(context.Background(), cfg, ingestedSamples) + + scraper := &testScraper{} + sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples).(*scrapeLoop) + sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0)) + + // Ignore value of scrape_duration_seconds, as it's time dependant. + reportedSamples.buffer[1].Value = 0 + + if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) { + t.Errorf("Reported samples did not match expected metrics") + t.Errorf("Expected: %v", test.expectedReportedSamples) + t.Fatalf("Got: %v", reportedSamples.buffer) + } + if test.expectedIngestedSamplesCount != len(ingestedSamples.buffer) { + t.Fatalf("Ingested samples %d did not match expected value %d", len(ingestedSamples.buffer), test.expectedIngestedSamplesCount) + } + } + +} + func TestScrapeLoopStop(t *testing.T) { scraper := &testScraper{} sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) diff --git a/retrieval/target.go b/retrieval/target.go index 094d856c19..4a0b94bdf3 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -284,7 +284,7 @@ type bufferAppender struct { buffer model.Samples } -func (app bufferAppender) Append(s *model.Sample) error { +func (app *bufferAppender) Append(s *model.Sample) error { app.buffer = append(app.buffer, s) return nil }