From 775316f8d2fffe840b9fa7403a16eb40cd60bbea Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 23 Feb 2016 11:56:09 +0100 Subject: [PATCH 1/5] Move appender construction from Target to scrapePool --- retrieval/scrape.go | 41 +++++++++++++++++++- retrieval/scrape_test.go | 79 +++++++++++++++++++++++++++++++++++++- retrieval/target.go | 41 +------------------- retrieval/target_test.go | 76 ------------------------------------ retrieval/targetmanager.go | 2 +- 5 files changed, 120 insertions(+), 119 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 507dab878c..9c52e6e1e3 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" ) @@ -69,6 +70,7 @@ func init() { // scrapePool manages scrapes for sets of targets. type scrapePool struct { appender storage.SampleAppender + config *config.ScrapeConfig ctx context.Context mtx sync.RWMutex @@ -77,9 +79,10 @@ type scrapePool struct { targets map[model.Fingerprint]loop } -func newScrapePool(app storage.SampleAppender) *scrapePool { +func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { return &scrapePool{ appender: app, + config: cfg, tgroups: map[string]map[model.Fingerprint]*Target{}, } } @@ -104,6 +107,40 @@ func (sp *scrapePool) stop() { wg.Wait() } +// sampleAppender returns an appender for ingested samples from the target. +func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { + app := sp.appender + // The relabelAppender has to be inside the label-modifying appenders + // so the relabeling rules are applied to the correct label set. + if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { + app = relabelAppender{ + SampleAppender: app, + relabelings: mrc, + } + } + + if sp.config.HonorLabels { + app = honorLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } else { + app = ruleLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } + return app +} + +// reportAppender returns an appender for reporting samples for the target. +func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { + return ruleLabelsAppender{ + SampleAppender: sp.appender, + labels: target.Labels(), + } +} + func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { sp.mtx.Lock() @@ -127,7 +164,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { } else { newTargets[fp] = tnew - tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, tnew.wrapAppender(sp.appender), tnew.wrapReportingAppender(sp.appender)) + tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, sp.sampleAppender(tnew), sp.reportAppender(tnew)) go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil) } } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 142bc13af2..ee6dfbc20b 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -20,9 +20,86 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" - // "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/config" ) +func TestScrapePoolReportAppender(t *testing.T) { + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + {}, {}, {}, + }, + } + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + app := &nopAppender{} + + sp := newScrapePool(cfg, app) + + cfg.HonorLabels = false + wrapped := sp.reportAppender(target) + + rl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + if rl.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", rl.SampleAppender) + } + + cfg.HonorLabels = true + wrapped = sp.reportAppender(target) + + hl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + if hl.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", hl.SampleAppender) + } +} + +func TestScrapePoolSampleAppender(t *testing.T) { + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + {}, {}, {}, + }, + } + + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + app := &nopAppender{} + + sp := newScrapePool(cfg, app) + + cfg.HonorLabels = false + wrapped := sp.sampleAppender(target) + + rl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + re, ok := rl.SampleAppender.(relabelAppender) + if !ok { + t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) + } + if re.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", re.SampleAppender) + } + + cfg.HonorLabels = true + wrapped = sp.sampleAppender(target) + + hl, ok := wrapped.(honorLabelsAppender) + if !ok { + t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) + } + re, ok = hl.SampleAppender.(relabelAppender) + if !ok { + t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) + } + if re.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", re.SampleAppender) + } +} + func TestScrapeLoopRun(t *testing.T) { var ( signal = make(chan struct{}) diff --git a/retrieval/target.go b/retrieval/target.go index d341d98dfc..0074b6e2d5 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -121,13 +121,12 @@ type Target struct { // The status object for the target. It is only set once on initialization. status *TargetStatus - scrapeLoop *scrapeLoop + scrapeLoop *scrapeLoop + scrapeConfig *config.ScrapeConfig // Mutex protects the members below. sync.RWMutex - scrapeConfig *config.ScrapeConfig - // Labels before any processing. metaLabels model.LabelSet // Any labels that are added to this target and its metrics. @@ -265,42 +264,6 @@ func (t *Target) path() string { return string(t.labels[model.MetricsPathLabel]) } -// wrapAppender wraps a SampleAppender for samples ingested from the target. -// RLock must be acquired by the caller. -func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender { - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, - } - } - - if t.scrapeConfig.HonorLabels { - app = honorLabelsAppender{ - SampleAppender: app, - labels: t.unlockedLabels(), - } - } else { - app = ruleLabelsAppender{ - SampleAppender: app, - labels: t.unlockedLabels(), - } - } - return app -} - -// wrapReportingAppender wraps an appender for target status report samples. -// It ignores any relabeling rules set for the target. -// RLock must not be acquired by the caller. -func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender { - return ruleLabelsAppender{ - SampleAppender: app, - labels: t.Labels(), - } -} - // URL returns a copy of the target's URL. func (t *Target) URL() *url.URL { t.RLock() diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 54a89b2295..a6c77d2320 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -92,82 +92,6 @@ func TestTargetOffset(t *testing.T) { } } -func TestTargetWrapReportingAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - target.scrapeConfig = cfg - app := &nopAppender{} - - cfg.HonorLabels = false - wrapped := target.wrapReportingAppender(app) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if rl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", rl.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = target.wrapReportingAppender(app) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if hl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", hl.SampleAppender) - } -} - -func TestTargetWrapAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - target.scrapeConfig = cfg - app := &nopAppender{} - - cfg.HonorLabels = false - wrapped := target.wrapAppender(app) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - re, ok := rl.SampleAppender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) - } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = target.wrapAppender(app) - - hl, ok := wrapped.(honorLabelsAppender) - if !ok { - t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) - } - re, ok = hl.SampleAppender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) - } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) - } -} - func TestTargetScrape404(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index bc750f4b2e..9ffc0d6214 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -184,7 +184,7 @@ type targetSet struct { func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { ts := &targetSet{ tgroups: map[string]map[model.Fingerprint]*Target{}, - scrapePool: newScrapePool(app), + scrapePool: newScrapePool(cfg, app), syncCh: make(chan struct{}, 1), config: cfg, } From 02f635dc241f7de0cf99ee3af954bd2e764ec3a3 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 23 Feb 2016 12:06:08 +0100 Subject: [PATCH 2/5] Remove interval/timeout from Target internals --- retrieval/scrape.go | 2 +- retrieval/target.go | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 9c52e6e1e3..7f27d2d154 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -165,7 +165,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { newTargets[fp] = tnew tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, sp.sampleAppender(tnew), sp.reportAppender(tnew)) - go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil) + go tnew.scrapeLoop.run(time.Duration(sp.config.ScrapeInterval), time.Duration(sp.config.ScrapeTimeout), nil) } } for fp, told := range prevTargets { diff --git a/retrieval/target.go b/retrieval/target.go index 0074b6e2d5..81d3e1bd83 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -229,20 +229,6 @@ func (t *Target) offset(interval time.Duration) time.Duration { return time.Duration(next) } -func (t *Target) interval() time.Duration { - t.RLock() - defer t.RUnlock() - - return time.Duration(t.scrapeConfig.ScrapeInterval) -} - -func (t *Target) timeout() time.Duration { - t.RLock() - defer t.RUnlock() - - return time.Duration(t.scrapeConfig.ScrapeTimeout) -} - func (t *Target) scheme() string { t.RLock() defer t.RUnlock() From 84f74b9a840f375acbb95bb1259ba24a43269ca2 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 23 Feb 2016 13:34:24 +0100 Subject: [PATCH 3/5] Apply new scrape config on reload. This commit updates a target set's scrape configuration on reload. This will cause all running scrape loops to be stopped and started again with new parameters. --- retrieval/scrape.go | 32 +++++++++++++++++++++++++++++--- retrieval/targetmanager.go | 10 ++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 7f27d2d154..5586671e1d 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -107,6 +107,34 @@ func (sp *scrapePool) stop() { wg.Wait() } +func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { + log.Debugln("reload scrapepool") + defer log.Debugln("reload done") + + sp.mtx.Lock() + defer sp.mtx.Unlock() + + sp.config = cfg + + var wg sync.WaitGroup + + for _, tgroup := range sp.tgroups { + for _, t := range tgroup { + wg.Add(1) + + go func(t *Target) { + t.scrapeLoop.stop() + + t.scrapeLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) + go t.scrapeLoop.run(time.Duration(cfg.ScrapeInterval), time.Duration(cfg.ScrapeTimeout), nil) + wg.Done() + }(t) + } + } + + wg.Wait() +} + // sampleAppender returns an appender for ingested samples from the target. func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { app := sp.appender @@ -143,6 +171,7 @@ func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { sp.mtx.Lock() + defer sp.mtx.Unlock() var ( wg sync.WaitGroup @@ -203,9 +232,6 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() - - // TODO(fabxc): maybe this can be released earlier with subsequent refactoring. - sp.mtx.Unlock() } // A scraper retrieves samples and accepts a status report at the end. diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 9ffc0d6214..474dbaf86a 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -117,6 +117,8 @@ func (tm *TargetManager) reload() { ts.runScraping(tm.ctx) tm.wg.Done() }(ts) + } else { + ts.reload(scfg) } ts.runProviders(tm.ctx, providersFromConfig(scfg)) } @@ -203,6 +205,14 @@ func (ts *targetSet) cancel() { } } +func (ts *targetSet) reload(cfg *config.ScrapeConfig) { + ts.mtx.Lock() + ts.config = cfg + ts.mtx.Unlock() + + ts.scrapePool.reload(cfg) +} + func (ts *targetSet) runScraping(ctx context.Context) { ctx, ts.cancelScraping = context.WithCancel(ctx) From 76a8c6160d4659f32d42a923d7f556ee69352f5a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 23 Feb 2016 14:37:25 +0100 Subject: [PATCH 4/5] Deduplicate targets in scrape pool. With this commit the scrape pool deduplicates incoming targets before scraping them. This way multiple target providers can produce the same target but it will be scraped only once. --- retrieval/scrape.go | 130 ++++++++++++++++++------------------- retrieval/targetmanager.go | 35 ++++++---- 2 files changed, 85 insertions(+), 80 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 5586671e1d..d23d38ff20 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -72,64 +72,72 @@ type scrapePool struct { appender storage.SampleAppender config *config.ScrapeConfig - ctx context.Context - mtx sync.RWMutex - tgroups map[string]map[model.Fingerprint]*Target + ctx context.Context - targets map[model.Fingerprint]loop + mtx sync.RWMutex + targets map[model.Fingerprint]*Target + loops map[model.Fingerprint]loop } func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { return &scrapePool{ appender: app, config: cfg, - tgroups: map[string]map[model.Fingerprint]*Target{}, + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, } } +// stop terminates all scrape loops and returns after they all terminated. +// A stopped scrape pool must not be used again. func (sp *scrapePool) stop() { var wg sync.WaitGroup sp.mtx.RLock() - for _, tgroup := range sp.tgroups { - for _, t := range tgroup { - wg.Add(1) + for _, l := range sp.loops { + wg.Add(1) - go func(t *Target) { - t.scrapeLoop.stop() - wg.Done() - }(t) - } + go func(l loop) { + l.stop() + wg.Done() + }(l) } sp.mtx.RUnlock() wg.Wait() } +// reload the scrape pool with the given scrape configuration. The target state is preserved +// but all scrape loops are restarted with the new scrape configuration. +// This method returns after all scrape loops that were stopped have fully terminated. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { - log.Debugln("reload scrapepool") - defer log.Debugln("reload done") - sp.mtx.Lock() defer sp.mtx.Unlock() sp.config = cfg - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) + ) - for _, tgroup := range sp.tgroups { - for _, t := range tgroup { - wg.Add(1) + for fp, oldLoop := range sp.loops { + var ( + t = sp.targets[fp] + newLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) + ) + wg.Add(1) - go func(t *Target) { - t.scrapeLoop.stop() + go func(oldLoop, newLoop loop) { + oldLoop.stop() + wg.Done() - t.scrapeLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) - go t.scrapeLoop.run(time.Duration(cfg.ScrapeInterval), time.Duration(cfg.ScrapeTimeout), nil) - wg.Done() - }(t) - } + go newLoop.run(interval, timeout, nil) + }(oldLoop, newLoop) + + sp.loops[fp] = newLoop } wg.Wait() @@ -169,64 +177,49 @@ func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { } } -func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { +// sync takes a list of potentially duplicated targets, deduplicates them, starts +// scrape loops for new targets, and stops scrape loops for disappeared targets. +// It returns after all stopped scrape loops terminated. +func (sp *scrapePool) sync(targets []*Target) { sp.mtx.Lock() defer sp.mtx.Unlock() var ( - wg sync.WaitGroup - newTgroups = map[string]map[model.Fingerprint]*Target{} + fingerprints = map[model.Fingerprint]struct{}{} + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) ) - for source, targets := range tgroups { - var ( - prevTargets = sp.tgroups[source] - newTargets = map[model.Fingerprint]*Target{} - ) - newTgroups[source] = newTargets + for _, t := range targets { + fp := t.fingerprint() + fingerprints[fp] = struct{}{} - for fp, tnew := range targets { - // If the same target existed before, we let it run and replace - // the new one with it. - if told, ok := prevTargets[fp]; ok { - newTargets[fp] = told - } else { - newTargets[fp] = tnew + if _, ok := sp.targets[fp]; !ok { + l := newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) - tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, sp.sampleAppender(tnew), sp.reportAppender(tnew)) - go tnew.scrapeLoop.run(time.Duration(sp.config.ScrapeInterval), time.Duration(sp.config.ScrapeTimeout), nil) - } - } - for fp, told := range prevTargets { - // A previous target is no longer in the group. - if _, ok := targets[fp]; !ok { - wg.Add(1) + sp.targets[fp] = t + sp.loops[fp] = l - go func(told *Target) { - told.scrapeLoop.stop() - wg.Done() - }(told) - } + go l.run(interval, timeout, nil) } } - // Stop scrapers for target groups that disappeared completely. - for source, targets := range sp.tgroups { - if _, ok := tgroups[source]; ok { - continue - } - for _, told := range targets { + var wg sync.WaitGroup + + // Stop and remove old targets and scraper loops. + for fp := range sp.targets { + if _, ok := fingerprints[fp]; !ok { wg.Add(1) - - go func(told *Target) { - told.scrapeLoop.stop() + go func(l loop) { + l.stop() wg.Done() - }(told) + }(sp.loops[fp]) + + delete(sp.loops, fp) + delete(sp.targets, fp) } } - sp.tgroups = newTgroups - // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still @@ -241,6 +234,7 @@ type scraper interface { offset(interval time.Duration) time.Duration } +// A loop can run and be stopped again. It must be reused after it was stopped. type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 474dbaf86a..d0f0960a1a 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -142,12 +142,14 @@ func (tm *TargetManager) Pools() map[string][]*Target { // TODO(fabxc): this is just a hack to maintain compatibility for now. for _, ps := range tm.targetSets { - for _, ts := range ps.scrapePool.tgroups { - for _, t := range ts { - job := string(t.Labels()[model.JobLabel]) - pools[job] = append(pools[job], t) - } + ps.scrapePool.mtx.RLock() + + for _, t := range ps.scrapePool.targets { + job := string(t.Labels()[model.JobLabel]) + pools[job] = append(pools[job], t) } + + ps.scrapePool.mtx.RUnlock() } return pools } @@ -168,10 +170,12 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { } // targetSet holds several TargetProviders for which the same scrape configuration -// is used. It runs the target providers and starts and stops scrapers as it -// receives target updates. +// is used. It maintains target groups from all given providers and sync them +// to a scrape pool. type targetSet struct { - mtx sync.RWMutex + mtx sync.RWMutex + + // Sets of targets by a source string that is unique across target providers. tgroups map[string]map[model.Fingerprint]*Target providers map[string]TargetProvider @@ -231,7 +235,9 @@ Loop: case <-ctx.Done(): break Loop case <-ts.syncCh: + ts.mtx.RLock() ts.sync() + ts.mtx.RUnlock() } } @@ -241,9 +247,13 @@ Loop: } func (ts *targetSet) sync() { - // TODO(fabxc): temporary simple version. For a deduplicating scrape pool we will - // submit a list of all targets. - ts.scrapePool.sync(ts.tgroups) + targets := []*Target{} + for _, tgroup := range ts.tgroups { + for _, t := range tgroup { + targets = append(targets, t) + } + } + ts.scrapePool.sync(targets) } func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { @@ -308,8 +318,9 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ go prov.Run(ctx, updates) } + // We wait for a full initial set of target groups before releasing the mutex + // to ensure the initial sync is complete and there are no races with subsequent updates. wg.Wait() - ts.sync() } From 9bea27ae8a7425d56e17ad19d44b84a7b6cf43cf Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 28 Feb 2016 09:51:02 +0100 Subject: [PATCH 5/5] Add scraping tests --- retrieval/scrape.go | 126 ++++++++++---------- retrieval/scrape_test.go | 241 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 306 insertions(+), 61 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index d23d38ff20..34326c13e6 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -74,9 +74,14 @@ type scrapePool struct { ctx context.Context + // Targets and loops must always be synchronized to have the same + // set of fingerprints. mtx sync.RWMutex targets map[model.Fingerprint]*Target loops map[model.Fingerprint]loop + + // Constructor for new scrape loops. This is settable for testing convenience. + newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop } func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { @@ -85,25 +90,28 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape config: cfg, targets: map[model.Fingerprint]*Target{}, loops: map[model.Fingerprint]loop{}, + newLoop: newScrapeLoop, } } // stop terminates all scrape loops and returns after they all terminated. -// A stopped scrape pool must not be used again. func (sp *scrapePool) stop() { var wg sync.WaitGroup - sp.mtx.RLock() + sp.mtx.Lock() + defer sp.mtx.Unlock() - for _, l := range sp.loops { + for fp, l := range sp.loops { wg.Add(1) go func(l loop) { l.stop() wg.Done() }(l) + + delete(sp.loops, fp) + delete(sp.targets, fp) } - sp.mtx.RUnlock() wg.Wait() } @@ -126,7 +134,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { for fp, oldLoop := range sp.loops { var ( t = sp.targets[fp] - newLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) + newLoop = sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) ) wg.Add(1) @@ -143,6 +151,56 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { wg.Wait() } +// sync takes a list of potentially duplicated targets, deduplicates them, starts +// scrape loops for new targets, and stops scrape loops for disappeared targets. +// It returns after all stopped scrape loops terminated. +func (sp *scrapePool) sync(targets []*Target) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + + var ( + fingerprints = map[model.Fingerprint]struct{}{} + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) + ) + + for _, t := range targets { + fp := t.fingerprint() + fingerprints[fp] = struct{}{} + + if _, ok := sp.targets[fp]; !ok { + l := sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) + + sp.targets[fp] = t + sp.loops[fp] = l + + go l.run(interval, timeout, nil) + } + } + + var wg sync.WaitGroup + + // Stop and remove old targets and scraper loops. + for fp := range sp.targets { + if _, ok := fingerprints[fp]; !ok { + wg.Add(1) + go func(l loop) { + l.stop() + wg.Done() + }(sp.loops[fp]) + + delete(sp.loops, fp) + delete(sp.targets, fp) + } + } + + // Wait for all potentially stopped scrapers to terminate. + // This covers the case of flapping targets. If the server is under high load, a new scraper + // may be active and tries to insert. The old scraper that didn't terminate yet could still + // be inserting a previous sample set. + wg.Wait() +} + // sampleAppender returns an appender for ingested samples from the target. func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { app := sp.appender @@ -177,56 +235,6 @@ func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { } } -// sync takes a list of potentially duplicated targets, deduplicates them, starts -// scrape loops for new targets, and stops scrape loops for disappeared targets. -// It returns after all stopped scrape loops terminated. -func (sp *scrapePool) sync(targets []*Target) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - - var ( - fingerprints = map[model.Fingerprint]struct{}{} - interval = time.Duration(sp.config.ScrapeInterval) - timeout = time.Duration(sp.config.ScrapeTimeout) - ) - - for _, t := range targets { - fp := t.fingerprint() - fingerprints[fp] = struct{}{} - - if _, ok := sp.targets[fp]; !ok { - l := newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) - - sp.targets[fp] = t - sp.loops[fp] = l - - go l.run(interval, timeout, nil) - } - } - - var wg sync.WaitGroup - - // Stop and remove old targets and scraper loops. - for fp := range sp.targets { - if _, ok := fingerprints[fp]; !ok { - wg.Add(1) - go func(l loop) { - l.stop() - wg.Done() - }(sp.loops[fp]) - - delete(sp.loops, fp) - delete(sp.targets, fp) - } - } - - // Wait for all potentially stopped scrapers to terminate. - // This covers the case of flapping targets. If the server is under high load, a new scraper - // may be active and tries to insert. The old scraper that didn't terminate yet could still - // be inserting a previous sample set. - wg.Wait() -} - // A scraper retrieves samples and accepts a status report at the end. type scraper interface { scrape(ctx context.Context, ts time.Time) (model.Samples, error) @@ -234,7 +242,7 @@ type scraper interface { offset(interval time.Duration) time.Duration } -// A loop can run and be stopped again. It must be reused after it was stopped. +// A loop can run and be stopped again. It must not be reused after it was stopped. type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() @@ -247,12 +255,11 @@ type scrapeLoop struct { reportAppender storage.SampleAppender done chan struct{} - mtx sync.RWMutex ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) *scrapeLoop { +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { sl := &scrapeLoop{ scraper: sc, appender: app, @@ -321,10 +328,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { } func (sl *scrapeLoop) stop() { - sl.mtx.RLock() sl.cancel() - sl.mtx.RUnlock() - <-sl.done } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index ee6dfbc20b..f40bf9b9ad 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -14,6 +14,9 @@ package retrieval import ( + "fmt" + "reflect" + "sync" "testing" "time" @@ -21,8 +24,199 @@ import ( "golang.org/x/net/context" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" ) +func TestNewScrapePool(t *testing.T) { + var ( + app = &nopAppender{} + cfg = &config.ScrapeConfig{} + sp = newScrapePool(cfg, app) + ) + + if a, ok := sp.appender.(*nopAppender); !ok || a != app { + t.Fatalf("Wrong sample appender") + } + if sp.config != cfg { + t.Fatalf("Wrong scrape config") + } + if sp.newLoop == nil { + t.Fatalf("newLoop function not initialized") + } +} + +type testLoop struct { + startFunc func(interval, timeout time.Duration, errc chan<- error) + stopFunc func() +} + +func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { + l.startFunc(interval, timeout, errc) +} + +func (l *testLoop) stop() { + l.stopFunc() +} + +func TestScrapePoolStop(t *testing.T) { + sp := &scrapePool{ + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, + } + var mtx sync.Mutex + stopped := map[model.Fingerprint]bool{} + numTargets := 20 + + // Stopping the scrape pool must call stop() on all scrape loops, + // clean them and the respective targets up. It must wait until each loop's + // stop function returned before returning itself. + + for i := 0; i < numTargets; i++ { + t := &Target{ + labels: model.LabelSet{ + model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), + }, + } + l := &testLoop{} + l.stopFunc = func() { + time.Sleep(time.Duration(i*20) * time.Millisecond) + + mtx.Lock() + stopped[t.fingerprint()] = true + mtx.Unlock() + } + + sp.targets[t.fingerprint()] = t + sp.loops[t.fingerprint()] = l + } + + done := make(chan struct{}) + stopTime := time.Now() + + go func() { + sp.stop() + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("scrapeLoop.stop() did not return as expected") + case <-done: + // This should have taken at least as long as the last target slept. + if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond { + t.Fatalf("scrapeLoop.stop() exited before all targets stopped") + } + } + + mtx.Lock() + if len(stopped) != numTargets { + t.Fatalf("Expected 20 stopped loops, got %d", len(stopped)) + } + mtx.Unlock() + + if len(sp.targets) > 0 { + t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets)) + } + if len(sp.loops) > 0 { + t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops)) + } +} + +func TestScrapePoolReload(t *testing.T) { + var mtx sync.Mutex + numTargets := 20 + + stopped := map[model.Fingerprint]bool{} + + reloadCfg := &config.ScrapeConfig{ + ScrapeInterval: model.Duration(3 * time.Second), + ScrapeTimeout: model.Duration(2 * time.Second), + } + // On starting to run, new loops created on reload check whether their preceeding + // equivalents have been stopped. + newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { + l := &testLoop{} + l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { + if interval != 3*time.Second { + t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval) + } + if timeout != 2*time.Second { + t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout) + } + mtx.Lock() + if !stopped[s.(*Target).fingerprint()] { + t.Errorf("Scrape loop for %v not stopped yet", s.(*Target)) + } + mtx.Unlock() + } + return l + } + sp := &scrapePool{ + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, + newLoop: newLoop, + } + + // Reloading a scrape pool with a new scrape configuration must stop all scrape + // loops and start new ones. A new loop must not be started before the preceeding + // one terminated. + + for i := 0; i < numTargets; i++ { + t := &Target{ + labels: model.LabelSet{ + model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), + }, + } + l := &testLoop{} + l.stopFunc = func() { + time.Sleep(time.Duration(i*20) * time.Millisecond) + + mtx.Lock() + stopped[t.fingerprint()] = true + mtx.Unlock() + } + + sp.targets[t.fingerprint()] = t + sp.loops[t.fingerprint()] = l + } + done := make(chan struct{}) + + beforeTargets := map[model.Fingerprint]*Target{} + for fp, t := range sp.targets { + beforeTargets[fp] = t + } + + reloadTime := time.Now() + + go func() { + sp.reload(reloadCfg) + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("scrapeLoop.reload() did not return as expected") + case <-done: + // This should have taken at least as long as the last target slept. + if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond { + t.Fatalf("scrapeLoop.stop() exited before all targets stopped") + } + } + + mtx.Lock() + if len(stopped) != numTargets { + t.Fatalf("Expected 20 stopped loops, got %d", stopped) + } + mtx.Unlock() + + if !reflect.DeepEqual(sp.targets, beforeTargets) { + t.Fatalf("Reloading affected target states unexpectedly") + } + if len(sp.loops) != numTargets { + t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops)) + } +} + func TestScrapePoolReportAppender(t *testing.T) { cfg := &config.ScrapeConfig{ MetricRelabelConfigs: []*config.RelabelConfig{ @@ -100,6 +294,53 @@ func TestScrapePoolSampleAppender(t *testing.T) { } } +func TestScrapeLoopStop(t *testing.T) { + scraper := &testScraper{} + sl := newScrapeLoop(context.Background(), scraper, nil, nil) + + // The scrape pool synchronizes on stopping scrape loops. However, new scrape + // loops are syarted asynchronously. Thus it's possible, that a loop is stopped + // again before having started properly. + // Stopping not-yet-started loops must block until the run method was called and exited. + // The run method must exit immediately. + + stopDone := make(chan struct{}) + go func() { + sl.stop() + close(stopDone) + }() + + select { + case <-stopDone: + t.Fatalf("Stopping terminated before run exited successfully") + case <-time.After(500 * time.Millisecond): + } + + // Running the scrape loop must exit before calling the scraper even once. + scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) { + t.Fatalf("scraper was called for terminated scrape loop") + return nil, nil + } + + runDone := make(chan struct{}) + go func() { + sl.run(0, 0, nil) + close(runDone) + }() + + select { + case <-runDone: + case <-time.After(1 * time.Second): + t.Fatalf("Running terminated scrape loop did not exit") + } + + select { + case <-stopDone: + case <-time.After(1 * time.Second): + t.Fatalf("Stopping did not terminate after running exited") + } +} + func TestScrapeLoopRun(t *testing.T) { var ( signal = make(chan struct{})