From 05de8b7f8d43bc37b7198cad16e9b4760ba255ff Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 22 Feb 2016 16:46:55 +0100 Subject: [PATCH] Extract target scraping into scrape loop. This commit factors out the scrape loop handling into its own data structure. For the transition it will be directly attached to the target. --- retrieval/scrape.go | 325 ++++++++++++++++++++++++++++++++----- retrieval/target.go | 219 +++---------------------- retrieval/targetmanager.go | 101 ------------ 3 files changed, 305 insertions(+), 340 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 40d398dedd..da6fa5d8a8 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -13,65 +13,302 @@ package retrieval -// import ( -// "sync" -// "time" +import ( + "errors" + "sync" + "time" -// "github.com/prometheus/common/log" -// "github.com/prometheus/common/model" -// "golang.org/x/net/context" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "golang.org/x/net/context" -// "github.com/prometheus/prometheus/config" -// "github.com/prometheus/prometheus/storage" -// ) + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" +) -// type scraper interface { -// scrape(context.Context) error -// report(start time.Time, dur time.Duration, err error) error -// } +const ( + scrapeHealthMetricName = "up" + scrapeDurationMetricName = "scrape_duration_seconds" -// type scrapePool struct { -// mtx sync.RWMutex -// targets map[model.Fingerprint]*Target -// loops map[model.Fingerprint]loop + // Capacity of the channel to buffer samples during ingestion. + ingestedSamplesCap = 256 -// config *config.ScrapeConfig + // Constants for instrumentation. + namespace = "prometheus" + interval = "interval" +) -// newLoop func(context.Context) -// } +var ( + errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") -// func newScrapePool(c *config.ScrapeConfig) *scrapePool { -// return &scrapePool{config: c} -// } + targetIntervalLength = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "target_interval_length_seconds", + Help: "Actual intervals between scrapes.", + Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, + }, + []string{interval}, + ) + targetSkippedScrapes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "target_skipped_scrapes_total", + Help: "Total number of scrapes that were skipped because the metric storage was throttled.", + }, + []string{interval}, + ) +) -// func (sp *scrapePool) sync(targets []*Target) { -// sp.mtx.Lock() -// defer sp.mtx.Unlock() +func init() { + prometheus.MustRegister(targetIntervalLength) + prometheus.MustRegister(targetSkippedScrapes) +} -// uniqueTargets := make(map[string]*Target{}, len(targets)) +// A scraper retrieves samples and accepts a status report at the end. +type scraper interface { + scrape(context.Context) (model.Samples, error) + report(start time.Time, dur time.Duration, err error) + offset(interval time.Duration) time.Duration +} -// for _, t := range targets { -// uniqueTargets[t.fingerprint()] = t -// } +// scrapePool manages scrapes for sets of targets. +type scrapePool struct { + appender storage.SampleAppender -// sp.targets = uniqueTargets -// } + ctx context.Context + mtx sync.RWMutex + tgroups map[string]map[model.Fingerprint]*Target -// type scrapeLoop struct { -// scraper scraper -// mtx sync.RWMutex -// } + targets map[model.Fingerprint]loop +} -// func newScrapeLoop(ctx context.Context) +func newScrapePool(app storage.SampleAppender) *scrapePool { + return &scrapePool{ + appender: app, + tgroups: map[string]map[model.Fingerprint]*Target{}, + } +} -// func (sl *scrapeLoop) update() {} +func (sp *scrapePool) stop() { + var wg sync.WaitGroup -// func (sl *scrapeLoop) run(ctx context.Context) { -// var wg sync.WaitGroup + sp.mtx.RLock() -// wg.Wait() -// } + for _, tgroup := range sp.tgroups { + for _, t := range tgroup { + wg.Add(1) -// func (sl *scrapeLoop) stop() { + go func(t *Target) { + t.scrapeLoop.stop() + wg.Done() + }(t) + } + } + sp.mtx.RUnlock() -// } + wg.Wait() +} + +func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { + sp.mtx.Lock() + + var ( + wg sync.WaitGroup + newTgroups = map[string]map[model.Fingerprint]*Target{} + ) + + for source, targets := range tgroups { + var ( + prevTargets = sp.tgroups[source] + newTargets = map[model.Fingerprint]*Target{} + ) + newTgroups[source] = newTargets + + for fp, tnew := range targets { + // If the same target existed before, we let it run and replace + // the new one with it. + if told, ok := prevTargets[fp]; ok { + newTargets[fp] = told + } else { + newTargets[fp] = tnew + + tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, tnew.wrapAppender(sp.appender), tnew.wrapReportingAppender(sp.appender)) + go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout()) + } + } + for fp, told := range prevTargets { + // A previous target is no longer in the group. + if _, ok := targets[fp]; !ok { + wg.Add(1) + + go func(told *Target) { + told.scrapeLoop.stop() + wg.Done() + }(told) + } + } + } + + // Stop scrapers for target groups that disappeared completely. + for source, targets := range sp.tgroups { + if _, ok := tgroups[source]; ok { + continue + } + for _, told := range targets { + wg.Add(1) + + go func(told *Target) { + told.scrapeLoop.stop() + wg.Done() + }(told) + } + } + + sp.tgroups = newTgroups + + // Wait for all potentially stopped scrapers to terminate. + // This covers the case of flapping targets. If the server is under high load, a new scraper + // may be active and tries to insert. The old scraper that didn't terminate yet could still + // be inserting a previous sample set. + wg.Wait() + + // TODO(fabxc): maybe this can be released earlier with subsequent refactoring. + sp.mtx.Unlock() +} + +type loop interface { + run(interval, timeout time.Duration) + stop() +} + +type scrapeLoop struct { + scraper scraper + + appender storage.SampleAppender + reportAppender storage.SampleAppender + + done chan struct{} + mtx sync.RWMutex + ctx context.Context + cancel func() +} + +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) *scrapeLoop { + sl := &scrapeLoop{ + scraper: sc, + appender: app, + reportAppender: reportApp, + done: make(chan struct{}), + } + sl.ctx, sl.cancel = context.WithCancel(ctx) + + return sl +} + +func (sl *scrapeLoop) run(interval, timeout time.Duration) { + defer close(sl.done) + + select { + case <-time.After(sl.scraper.offset(interval)): + // Continue after a scraping offset. + case <-sl.ctx.Done(): + return + } + + var last time.Time + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-sl.ctx.Done(): + return + default: + } + + if sl.appender.NeedsThrottling() { + targetSkippedScrapes.WithLabelValues(interval.String()).Inc() + continue + } + targetIntervalLength.WithLabelValues(interval.String()).Observe( + float64(time.Since(last)) / float64(time.Second), // Sub-second precision. + ) + + var ( + start = time.Now() + scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + ) + + samples, err := sl.scraper.scrape(scrapeCtx) + if err == nil { + sl.append(samples) + } + + sl.report(start, time.Since(start), err) + + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + } + + last = start + } +} + +func (sl *scrapeLoop) stop() { + sl.mtx.RLock() + sl.cancel() + sl.mtx.RUnlock() + + <-sl.done +} + +func (sl *scrapeLoop) append(samples model.Samples) { + numOutOfOrder := 0 + + for _, s := range samples { + if err := sl.appender.Append(s); err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + log.Warnf("Error inserting sample: %s", err) + } + } + } + if numOutOfOrder > 0 { + log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + } +} + +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, err error) { + sl.scraper.report(start, duration, err) + + ts := model.TimeFromUnixNano(start.UnixNano()) + + var health model.SampleValue + if err == nil { + health = 1 + } + + healthSample := &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: scrapeHealthMetricName, + }, + Timestamp: ts, + Value: health, + } + durationSample := &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: scrapeDurationMetricName, + }, + Timestamp: ts, + Value: model.SampleValue(float64(duration) / float64(time.Second)), + } + + sl.reportAppender.Append(healthSample) + sl.reportAppender.Append(durationSample) +} diff --git a/retrieval/target.go b/retrieval/target.go index 43c19ca4e4..48e73cac78 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -14,7 +14,6 @@ package retrieval import ( - "errors" "fmt" "io" "io/ioutil" @@ -24,58 +23,16 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) -const ( - scrapeHealthMetricName = "up" - scrapeDurationMetricName = "scrape_duration_seconds" - - // Capacity of the channel to buffer samples during ingestion. - ingestedSamplesCap = 256 - - // Constants for instrumentation. - namespace = "prometheus" - interval = "interval" -) - -var ( - errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") - - targetIntervalLength = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "target_interval_length_seconds", - Help: "Actual intervals between scrapes.", - Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, - }, - []string{interval}, - ) - targetSkippedScrapes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "target_skipped_scrapes_total", - Help: "Total number of scrapes that were skipped because the metric storage was throttled.", - }, - []string{interval}, - ) -) - -func init() { - prometheus.MustRegister(targetIntervalLength) - prometheus.MustRegister(targetSkippedScrapes) -} - // TargetHealth describes the health state of a target. type TargetHealth int @@ -163,10 +120,8 @@ func (ts *TargetStatus) setLastError(err error) { type Target struct { // The status object for the target. It is only set once on initialization. status *TargetStatus - // Closing scraperStopping signals that scraping should stop. - scraperStopping chan struct{} - // Closing scraperStopped signals that scraping has been stopped. - scraperStopped chan struct{} + + scrapeLoop *scrapeLoop // Mutex protects the members below. sync.RWMutex @@ -189,13 +144,11 @@ func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Ta return nil, err } t := &Target{ - status: &TargetStatus{}, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), - scrapeConfig: cfg, - labels: labels, - metaLabels: metaLabels, - httpClient: client, + status: &TargetStatus{}, + scrapeConfig: cfg, + labels: labels, + metaLabels: metaLabels, + httpClient: client, } return t, nil } @@ -385,184 +338,60 @@ func (t *Target) InstanceIdentifier() string { return t.host() } -// RunScraper implements Target. -func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { - log.Debugf("Running scraper for %v", t) - - defer close(t.scraperStopped) - - lastScrapeInterval := t.interval() - - select { - case <-time.After(t.offset(lastScrapeInterval)): - // Continue after scraping offset. - case <-t.scraperStopping: - return - } - - ticker := time.NewTicker(lastScrapeInterval) - defer ticker.Stop() - - t.scrape(sampleAppender) - - // Explanation of the contraption below: - // - // In case t.scraperStopping has something to receive, we want to read - // from that channel rather than starting a new scrape (which might take very - // long). That's why the outer select has no ticker.C. Should t.scraperStopping - // not have anything to receive, we go into the inner select, where ticker.C - // is in the mix. - for { - select { - case <-t.scraperStopping: - return - default: - select { - case <-t.scraperStopping: - return - case <-ticker.C: - took := time.Since(t.status.LastScrape()) - - intervalStr := lastScrapeInterval.String() - - // On changed scrape interval the new interval becomes effective - // after the next scrape. - if iv := t.interval(); iv != lastScrapeInterval { - ticker.Stop() - ticker = time.NewTicker(iv) - lastScrapeInterval = iv - } - - targetIntervalLength.WithLabelValues(intervalStr).Observe( - float64(took) / float64(time.Second), // Sub-second precision. - ) - if sampleAppender.NeedsThrottling() { - targetSkippedScrapes.WithLabelValues(intervalStr).Inc() - t.status.setLastError(errSkippedScrape) - continue - } - t.scrape(sampleAppender) - } - } - } -} - -// StopScraper implements Target. -func (t *Target) StopScraper() { - log.Debugf("Stopping scraper for target %v...", t) - - close(t.scraperStopping) - <-t.scraperStopped - - log.Debugf("Scraper for target %v stopped.", t) -} - const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` -func (t *Target) scrape(appender storage.SampleAppender) error { - var ( - err error - start = time.Now() - ) - defer func(appender storage.SampleAppender) { - t.report(appender, start, time.Since(start), err) - }(appender) - +func (t *Target) scrape(ctx context.Context) (model.Samples, error) { t.RLock() - - appender = t.wrapAppender(appender) - client := t.httpClient t.RUnlock() + start := time.Now() + req, err := http.NewRequest("GET", t.URL().String(), nil) if err != nil { - return err + return nil, err } req.Header.Add("Accept", acceptHeader) - ctx, _ := context.WithTimeout(context.Background(), t.timeout()) resp, err := ctxhttp.Do(ctx, client, req) if err != nil { - return err + return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("server returned HTTP status %s", resp.Status) + return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) } - dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)) - + var ( + allSamples = make(model.Samples, 0, 200) + decSamples = make(model.Vector, 0, 50) + ) sdec := expfmt.SampleDecoder{ - Dec: dec, + Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)), Opts: &expfmt.DecodeOptions{ Timestamp: model.TimeFromUnixNano(start.UnixNano()), }, } - var ( - samples model.Vector - numOutOfOrder int - logger = log.With("target", t.InstanceIdentifier()) - ) for { - if err = sdec.Decode(&samples); err != nil { + if err = sdec.Decode(&decSamples); err != nil { break } - for _, s := range samples { - err := appender.Append(s) - if err != nil { - if err == local.ErrOutOfOrderSample { - numOutOfOrder++ - } else { - logger.With("sample", s).Warnf("Error inserting sample: %s", err) - } - } - - } - } - if numOutOfOrder > 0 { - logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + allSamples = append(allSamples, decSamples...) + decSamples = decSamples[:0] } if err == io.EOF { // Set err to nil since it is used in the scrape health recording. err = nil } - return err + return allSamples, err } -func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) { - t.status.setLastScrape(start) +func (t *Target) report(start time.Time, dur time.Duration, err error) { t.status.setLastError(err) - - ts := model.TimeFromUnixNano(start.UnixNano()) - - var health model.SampleValue - if err == nil { - health = 1 - } - - healthSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - }, - Timestamp: ts, - Value: health, - } - durationSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - }, - Timestamp: ts, - Value: model.SampleValue(float64(duration) / float64(time.Second)), - } - - app = t.wrapReportingAppender(app) - - app.Append(healthSample) - app.Append(durationSample) + t.status.setLastScrape(start) } // Merges the ingested sample's metric with the label set. On a collision the diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 6f2cd67f5a..bc750f4b2e 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -323,107 +323,6 @@ func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { return nil } -// scrapePool manages scrapes for sets of targets. -type scrapePool struct { - appender storage.SampleAppender - - ctx context.Context - mtx sync.RWMutex - tgroups map[string]map[model.Fingerprint]*Target -} - -func newScrapePool(app storage.SampleAppender) *scrapePool { - return &scrapePool{ - appender: app, - tgroups: map[string]map[model.Fingerprint]*Target{}, - } -} - -func (sp *scrapePool) stop() { - var wg sync.WaitGroup - - sp.mtx.RLock() - - for _, tgroup := range sp.tgroups { - for _, t := range tgroup { - wg.Add(1) - - go func(t *Target) { - t.StopScraper() - wg.Done() - }(t) - } - } - sp.mtx.RUnlock() - - wg.Wait() -} - -func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { - sp.mtx.Lock() - - var ( - wg sync.WaitGroup - newTgroups = map[string]map[model.Fingerprint]*Target{} - ) - - for source, targets := range tgroups { - var ( - prevTargets = sp.tgroups[source] - newTargets = map[model.Fingerprint]*Target{} - ) - newTgroups[source] = newTargets - - for fp, tnew := range targets { - // If the same target existed before, we let it run and replace - // the new one with it. - if told, ok := prevTargets[fp]; ok { - newTargets[fp] = told - } else { - newTargets[fp] = tnew - go tnew.RunScraper(sp.appender) - } - } - for fp, told := range prevTargets { - // A previous target is no longer in the group. - if _, ok := targets[fp]; !ok { - wg.Add(1) - - go func(told *Target) { - told.StopScraper() - wg.Done() - }(told) - } - } - } - - // Stop scrapers for target groups that disappeared completely. - for source, targets := range sp.tgroups { - if _, ok := tgroups[source]; ok { - continue - } - for _, told := range targets { - wg.Add(1) - - go func(told *Target) { - told.StopScraper() - wg.Done() - }(told) - } - } - - sp.tgroups = newTgroups - - // Wait for all potentially stopped scrapers to terminate. - // This covers the case of flapping targets. If the server is under high load, a new scraper - // may be active and tries to insert. The old scraper that didn't terminate yet could still - // be inserting a previous sample set. - wg.Wait() - - // TODO(fabxc): maybe this can be released earlier with subsequent refactoring. - sp.mtx.Unlock() -} - // providersFromConfig returns all TargetProviders configured in cfg. func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { providers := map[string]TargetProvider{}