diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index fc2645a841..83310cd644 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -57,9 +57,8 @@ type ConsulDiscovery struct { tagSeparator string scrapedServices map[string]struct{} - mu sync.RWMutex - services map[string]*consulService - runDone, srvsDone chan struct{} + mu sync.RWMutex + services map[string]*consulService } // consulService contains data belonging to the same service. @@ -93,8 +92,6 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery { client: client, clientConf: clientConf, tagSeparator: conf.TagSeparator, - runDone: make(chan struct{}), - srvsDone: make(chan struct{}, 1), scrapedServices: map[string]struct{}{}, services: map[string]*consulService{}, } @@ -133,18 +130,22 @@ func (cd *ConsulDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) { +func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) + defer cd.stop() update := make(chan *consulService, 10) - go cd.watchServices(update) + go cd.watchServices(update, done) for { select { - case <-cd.runDone: + case <-done: return case srv := <-update: if srv.removed { + close(srv.done) + + // Send clearing update. ch <- &config.TargetGroup{Source: srv.name} break } @@ -157,31 +158,20 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) { } } -// Stop implements the TargetProvider interface. -func (cd *ConsulDiscovery) Stop() { - log.Debugf("Stopping Consul service discovery for %s", cd.clientConf.Address) - +func (cd *ConsulDiscovery) stop() { // The lock prevents Run from terminating while the watchers attempt // to send on their channels. cd.mu.Lock() defer cd.mu.Unlock() - // The watching goroutines will terminate after their next watch timeout. - // As this can take long, the channel is buffered and we do not wait. for _, srv := range cd.services { - srv.done <- struct{}{} + close(srv.done) } - cd.srvsDone <- struct{}{} - - // Terminate Run. - cd.runDone <- struct{}{} - - log.Debugf("Consul service discovery for %s stopped.", cd.clientConf.Address) } // watchServices retrieves updates from Consul's services endpoint and sends // potential updates to the update channel. -func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { +func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-chan struct{}) { var lastIndex uint64 for { catalog := cd.client.Catalog() @@ -191,8 +181,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { }) if err != nil { log.Errorf("Error refreshing service list: %s", err) - <-time.After(consulRetryInterval) - continue + time.Sleep(consulRetryInterval) } // If the index equals the previous one, the watch timed out with no update. if meta.LastIndex == lastIndex { @@ -202,7 +191,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { cd.mu.Lock() select { - case <-cd.srvsDone: + case <-done: cd.mu.Unlock() return default: @@ -218,7 +207,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { srv = &consulService{ name: name, tgroup: &config.TargetGroup{}, - done: make(chan struct{}, 1), + done: make(chan struct{}), } srv.tgroup.Source = name cd.services[name] = srv @@ -234,7 +223,6 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { if _, ok := srvs[name]; !ok { srv.removed = true update <- srv - srv.done <- struct{}{} delete(cd.services, name) } } @@ -253,7 +241,7 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.Ta }) if err != nil { log.Errorf("Error refreshing service %s: %s", srv.name, err) - <-time.After(consulRetryInterval) + time.Sleep(consulRetryInterval) continue } // If the index equals the previous one, the watch timed out with no update. diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index f171dd6da8..138cfaf524 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -64,11 +64,11 @@ func init() { type DNSDiscovery struct { names []string - done chan struct{} - ticker *time.Ticker - m sync.RWMutex - port int - qtype uint16 + done chan struct{} + interval time.Duration + m sync.RWMutex + port int + qtype uint16 } // NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets. @@ -83,41 +83,34 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery { qtype = dns.TypeSRV } return &DNSDiscovery{ - names: conf.Names, - done: make(chan struct{}), - ticker: time.NewTicker(time.Duration(conf.RefreshInterval)), - qtype: qtype, - port: conf.Port, + names: conf.Names, + done: make(chan struct{}), + interval: time.Duration(conf.RefreshInterval), + qtype: qtype, + port: conf.Port, } } // Run implements the TargetProvider interface. -func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup) { +func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) + ticker := time.NewTicker(dd.interval) + defer ticker.Stop() + // Get an initial set right away. dd.refreshAll(ch) for { select { - case <-dd.ticker.C: + case <-ticker.C: dd.refreshAll(ch) - case <-dd.done: + case <-done: return } } } -// Stop implements the TargetProvider interface. -func (dd *DNSDiscovery) Stop() { - log.Debug("Stopping DNS discovery for %s...", dd.names) - - dd.ticker.Stop() - dd.done <- struct{}{} - - log.Debug("DNS discovery for %s stopped.", dd.names) -} - // Sources implements the TargetProvider interface. func (dd *DNSDiscovery) Sources() []string { var srcs []string diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index 4c0655f031..b5839a87ec 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -39,7 +39,6 @@ type FileDiscovery struct { paths []string watcher *fsnotify.Watcher interval time.Duration - done chan struct{} // lastRefresh stores which files were found during the last refresh // and how many target groups they contained. @@ -52,7 +51,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery { return &FileDiscovery{ paths: conf.Names, interval: time.Duration(conf.RefreshInterval), - done: make(chan struct{}), } } @@ -106,8 +104,9 @@ func (fd *FileDiscovery) watchFiles() { } // Run implements the TargetProvider interface. -func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) { +func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) + defer fd.stop() watcher, err := fsnotify.NewWatcher() if err != nil { @@ -125,10 +124,13 @@ func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) { // Stopping has priority over refreshing. Thus we wrap the actual select // clause to always catch done signals. select { - case <-fd.done: + case <-done: return default: select { + case <-done: + return + case event := <-fd.watcher.Events: // fsnotify sometimes sends a bunch of events without name or operation. // It's unclear what they are and why they are sent - filter them out. @@ -154,9 +156,6 @@ func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) { if err != nil { log.Errorf("Error on file watch: %s", err) } - - case <-fd.done: - return } } } @@ -198,11 +197,10 @@ func fileSource(filename string, i int) string { return fmt.Sprintf("%s:%d", filename, i) } -// Stop implements the TargetProvider interface. -func (fd *FileDiscovery) Stop() { +// stop shuts down the file watcher. +func (fd *FileDiscovery) stop() { log.Debugf("Stopping file discovery for %s...", fd.paths) - fd.done <- struct{}{} // Closing the watcher will deadlock unless all events and errors are drained. go func() { for { @@ -210,15 +208,13 @@ func (fd *FileDiscovery) Stop() { case <-fd.watcher.Errors: case <-fd.watcher.Events: // Drain all events and errors. - case <-fd.done: + default: return } } }() fd.watcher.Close() - fd.done <- struct{}{} - log.Debugf("File discovery for %s stopped.", fd.paths) } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 6d1f00c440..49e4fb175f 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -24,11 +24,13 @@ func testFileSD(t *testing.T, ext string) { conf.Names = []string{"fixtures/_*" + ext} conf.RefreshInterval = config.Duration(1 * time.Hour) - fsd := NewFileDiscovery(&conf) - - ch := make(chan *config.TargetGroup) - go fsd.Run(ch) - defer fsd.Stop() + var ( + fsd = NewFileDiscovery(&conf) + ch = make(chan *config.TargetGroup) + done = make(chan struct{}) + ) + go fsd.Run(ch, done) + defer close(done) select { case <-time.After(25 * time.Millisecond): diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go index 5fc4d07a95..a057e96e1a 100644 --- a/retrieval/discovery/marathon.go +++ b/retrieval/discovery/marathon.go @@ -40,13 +40,12 @@ func (md *MarathonDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) { +func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) for { select { - case <-md.done: - log.Debug("Shutting down marathon discovery.") + case <-done: return case <-time.After(md.refreshInterval): err := md.updateServices(ch) @@ -57,11 +56,6 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) { } } -// Stop implements the TargetProvider interface. -func (md *MarathonDiscovery) Stop() { - md.done <- struct{}{} -} - func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { targetMap, err := md.fetchTargetGroups() if err != nil { diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go index fcf11562c6..3b681cf790 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon_test.go @@ -181,17 +181,18 @@ func TestMarathonSDRunAndStop(t *testing.T) { return marathonTestAppList(marathonValidLabel, 1), nil }) md.refreshInterval = time.Millisecond * 10 + done := make(chan struct{}) go func() { select { case <-ch: - md.Stop() + close(done) case <-time.After(md.refreshInterval * 3): - md.Stop() + close(done) t.Fatalf("Update took too long.") } }() - md.Run(ch) + md.Run(ch, done) select { case <-ch: default: diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 5a84186fc6..b550989126 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -67,7 +67,6 @@ type ServersetDiscovery struct { sources map[string]*config.TargetGroup sdUpdates *chan<- *config.TargetGroup updates chan zookeeperTreeCacheEvent - runDone chan struct{} treeCache *zookeeperTreeCache } @@ -84,7 +83,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery { conn: conn, updates: updates, sources: map[string]*config.TargetGroup{}, - runDone: make(chan struct{}), } go sd.processUpdates() sd.treeCache = NewZookeeperTreeCache(conn, conf.Paths[0], updates) @@ -132,7 +130,7 @@ func (sd *ServersetDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) { +func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { // Send on everything we have seen so far. sd.mu.Lock() for _, targetGroup := range sd.sources { @@ -142,20 +140,10 @@ func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) { sd.sdUpdates = &ch sd.mu.Unlock() - <-sd.runDone + <-done sd.treeCache.Stop() } -// Stop implements the TargetProvider interface. -func (sd *ServersetDiscovery) Stop() { - log.Debugf("Stopping serverset service discovery for %s %s", sd.conf.Servers, sd.conf.Paths) - - // Terminate Run. - sd.runDone <- struct{}{} - - log.Debugf("Serverset service discovery for %s %s stopped", sd.conf.Servers, sd.conf.Paths) -} - func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, error) { member := serversetMember{} err := json.Unmarshal(data, &member) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index f72c93fe2c..f22cb5ab9a 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -53,17 +53,18 @@ type fakeTargetProvider struct { update chan *config.TargetGroup } -func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup) { +func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) - for tg := range tp.update { - ch <- tg + for { + select { + case tg := <-tp.update: + ch <- tg + case <-done: + return + } } } -func (tp *fakeTargetProvider) Stop() { - close(tp.update) -} - func (tp *fakeTargetProvider) Sources() []string { return tp.sources } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index ba64f55e81..81984a7515 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -43,20 +43,19 @@ type TargetProvider interface { // Run hands a channel to the target provider through which it can send // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. - Run(chan<- *config.TargetGroup) - // Stop terminates any potential computation of the target provider. The - // channel received on Run must be closed afterwards. - Stop() + // On receiving from done Run must return. + Run(up chan<- *config.TargetGroup, done <-chan struct{}) } // TargetManager maintains a set of targets, starts and stops their scraping and // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { - m sync.RWMutex + mtx sync.RWMutex globalLabels clientmodel.LabelSet sampleAppender storage.SampleAppender running bool + done chan struct{} // Targets by their source ID. targets map[string][]*Target @@ -73,31 +72,96 @@ func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { return tm } +// merge multiple target group channels into a single output channel. +func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate { + var wg sync.WaitGroup + out := make(chan targetGroupUpdate) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c or done is closed, then calls + // wg.Done. + redir := func(c <-chan targetGroupUpdate) { + defer wg.Done() + for n := range c { + select { + case out <- n: + case <-done: + return + } + } + } + + wg.Add(len(cs)) + for _, c := range cs { + go redir(c) + } + + // Close the out channel if all inbound channels are closed. + go func() { + wg.Wait() + close(out) + }() + return out +} + +// targetGroupUpdate is a potentially changed/new target group +// for the given scrape configuration. +type targetGroupUpdate struct { + tg *config.TargetGroup + scfg *config.ScrapeConfig +} + // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { log.Info("Starting target manager...") + tm.done = make(chan struct{}) + sources := map[string]struct{}{} + updates := []<-chan targetGroupUpdate{} for scfg, provs := range tm.providers { for _, prov := range provs { - ch := make(chan *config.TargetGroup) - go tm.handleTargetUpdates(scfg, ch) - + // Get an initial set of available sources so we don't remove + // target groups from the last run that are still available. for _, src := range prov.Sources() { sources[src] = struct{}{} } + tgc := make(chan *config.TargetGroup) // Run the target provider after cleanup of the stale targets is done. - defer func(p TargetProvider, c chan *config.TargetGroup) { - go p.Run(c) - }(prov, ch) + defer func(prov TargetProvider, tgc chan *config.TargetGroup) { + go prov.Run(tgc, tm.done) + }(prov, tgc) + + tgupc := make(chan targetGroupUpdate) + updates = append(updates, tgupc) + + go func(scfg *config.ScrapeConfig) { + defer close(tgupc) + for { + select { + case tg := <-tgc: + if tg == nil { + break + } + tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} + case <-tm.done: + return + } + } + }(scfg) } } - tm.m.Lock() - defer tm.m.Unlock() + // Merge all channels of incoming target group updates into a single + // one and keep applying the updates. + go tm.handleUpdates(merge(tm.done, updates...), tm.done) + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Remove old target groups that are no longer in the set of sources. tm.removeTargets(func(src string) bool { if _, ok := sources[src]; ok { return false @@ -110,24 +174,32 @@ func (tm *TargetManager) Run() { // handleTargetUpdates receives target group updates and handles them in the // context of the given job config. -func (tm *TargetManager) handleTargetUpdates(cfg *config.ScrapeConfig, ch <-chan *config.TargetGroup) { - for tg := range ch { - log.Debugf("Received potential update for target group %q", tg.Source) +func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) { + for { + select { + case update := <-ch: + if update.tg == nil { + break + } + log.Debugf("Received potential update for target group %q", update.tg.Source) - if err := tm.updateTargetGroup(tg, cfg); err != nil { - log.Errorf("Error updating targets: %s", err) + if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil { + log.Errorf("Error updating targets: %s", err) + } + case <-done: + return } } } // Stop all background processing. func (tm *TargetManager) Stop() { - tm.m.RLock() + tm.mtx.RLock() if tm.running { defer tm.stop(true) } // Return the lock before calling tm.stop(). - defer tm.m.RUnlock() + defer tm.mtx.RUnlock() } // stop background processing of the target manager. If removeTargets is true, @@ -136,25 +208,10 @@ func (tm *TargetManager) stop(removeTargets bool) { log.Info("Stopping target manager...") defer log.Info("Target manager stopped.") - tm.m.Lock() - provs := []TargetProvider{} - for _, ps := range tm.providers { - provs = append(provs, ps...) - } - tm.m.Unlock() + close(tm.done) - var wg sync.WaitGroup - wg.Add(len(provs)) - for _, prov := range provs { - go func(p TargetProvider) { - p.Stop() - wg.Done() - }(prov) - } - wg.Wait() - - tm.m.Lock() - defer tm.m.Unlock() + tm.mtx.Lock() + defer tm.mtx.Unlock() if removeTargets { tm.removeTargets(nil) @@ -194,8 +251,8 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf return err } - tm.m.Lock() - defer tm.m.Unlock() + tm.mtx.Lock() + defer tm.mtx.Unlock() if !tm.running { return nil @@ -261,8 +318,8 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf // Pools returns the targets currently being scraped bucketed by their job name. func (tm *TargetManager) Pools() map[string][]*Target { - tm.m.RLock() - defer tm.m.RUnlock() + tm.mtx.RLock() + defer tm.mtx.RUnlock() pools := map[string][]*Target{} @@ -279,9 +336,9 @@ func (tm *TargetManager) Pools() map[string][]*Target { // by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // Returns true on success. func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { - tm.m.RLock() + tm.mtx.RLock() running := tm.running - tm.m.RUnlock() + tm.mtx.RUnlock() if running { tm.stop(false) @@ -294,8 +351,8 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { providers[scfg] = providersFromConfig(scfg) } - tm.m.Lock() - defer tm.m.Unlock() + tm.mtx.Lock() + defer tm.mtx.Unlock() tm.globalLabels = cfg.GlobalConfig.Labels tm.providers = providers @@ -325,15 +382,23 @@ func (tp *prefixedTargetProvider) Sources() []string { return srcs } -func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup) { +func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) ch2 := make(chan *config.TargetGroup) - go tp.TargetProvider.Run(ch2) + go tp.TargetProvider.Run(ch2, done) - for tg := range ch2 { - tg.Source = tp.prefix(tg.Source) - ch <- tg + for { + select { + case <-done: + return + case tg := <-ch2: + if tg == nil { + break + } + tg.Source = tp.prefix(tg.Source) + ch <- tg + } } } @@ -382,8 +447,8 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { // targetsFromGroup builds targets based on the given TargetGroup and config. func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { - tm.m.RLock() - defer tm.m.RUnlock() + tm.mtx.RLock() + defer tm.mtx.RUnlock() targets := make([]*Target, 0, len(tg.Targets)) for i, labels := range tg.Targets { @@ -470,15 +535,17 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { } // Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup) { - for _, tg := range sd.TargetGroups { - ch <- tg - } - close(ch) // This provider never sends any updates. -} +func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { + defer close(ch) -// Stop implements the TargetProvider interface. -func (sd *StaticProvider) Stop() {} + for _, tg := range sd.TargetGroups { + select { + case <-done: + return + case ch <- tg: + } + } +} // TargetGroups returns the provider's target groups. func (sd *StaticProvider) Sources() (srcs []string) { diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index a911ce8df7..8d9f3dc407 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -54,7 +54,10 @@ func TestPrefixedTargetProvider(t *testing.T) { } ch := make(chan *config.TargetGroup) - go tp.Run(ch) + done := make(chan struct{}) + + defer close(done) + go tp.Run(ch, done) expGroup1 := *targetGroups[0] expGroup2 := *targetGroups[1] @@ -347,10 +350,10 @@ func TestTargetManagerConfigUpdate(t *testing.T) { conf.ScrapeConfigs = step.scrapeConfigs targetManager.ApplyConfig(conf) - <-time.After(1 * time.Millisecond) + time.Sleep(50 * time.Millisecond) if len(targetManager.targets) != len(step.expected) { - t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected) + t.Fatalf("step %d: sources mismatch: expected %v, got %v", i, step.expected, targetManager.targets) } for source, actTargets := range targetManager.targets {