diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 871a291a9f..78c43e16d0 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -17,7 +17,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" consul "github.com/hashicorp/consul/api" @@ -57,20 +56,7 @@ type Discovery struct { clientConf *consul.Config clientDatacenter string tagSeparator string - scrapedServices map[string]struct{} - - mu sync.RWMutex - services map[string]*consulService -} - -// consulService contains data belonging to the same service. -type consulService struct { - name string - tgroup config.TargetGroup - lastIndex uint64 - removed bool - running bool - done chan struct{} + scrapedServices map[string]struct{} // Set of services which will be discovered. } // NewDiscovery returns a new Discovery for the given config. @@ -94,7 +80,6 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { clientConf: clientConf, tagSeparator: conf.TagSeparator, scrapedServices: map[string]struct{}{}, - services: map[string]*consulService{}, } // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent // (Consul default is to use local node's datacenter if one isn't given for a query). @@ -116,46 +101,10 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { // Run implements the TargetProvider interface. func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) - defer cd.stop() - update := make(chan *consulService, 10) - go cd.watchServices(update, ctx.Done()) + // Watched services and their cancelation functions. + services := map[string]func(){} - for { - select { - case <-ctx.Done(): - return - case srv := <-update: - if srv.removed { - close(srv.done) - - // Send clearing update. - ch <- []*config.TargetGroup{{Source: srv.name}} - break - } - // Launch watcher for the service. - if !srv.running { - go cd.watchService(srv, ch) - srv.running = true - } - } - } -} - -func (cd *Discovery) stop() { - // The lock prevents Run from terminating while the watchers attempt - // to send on their channels. - cd.mu.Lock() - defer cd.mu.Unlock() - - for _, srv := range cd.services { - close(srv.done) - } -} - -// watchServices retrieves updates from Consul's services endpoint and sends -// potential updates to the update channel. -func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan struct{}) { var lastIndex uint64 for { catalog := cd.client.Catalog() @@ -163,6 +112,15 @@ func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan str WaitIndex: lastIndex, WaitTime: watchTimeout, }) + + // We have to check the context at least once. The checks during channel sends + // do not guarantee that. + select { + case <-ctx.Done(): + return + default: + } + if err != nil { log.Errorf("Error refreshing service list: %s", err) time.Sleep(retryInterval) @@ -174,74 +132,100 @@ func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan str } lastIndex = meta.LastIndex - cd.mu.Lock() + // Check for new services. + for name := range srvs { + // If no restriction on scraped services is set, we scrape everything. + if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { + continue + } + if _, ok := services[name]; ok { + continue // We are already watching the service. + } + + srv := &consulService{ + client: cd.client, + name: name, + labels: model.LabelSet{ + serviceLabel: model.LabelValue(name), + datacenterLabel: model.LabelValue(cd.clientDatacenter), + }, + tagSeparator: cd.tagSeparator, + } + + wctx, cancel := context.WithCancel(ctx) + go srv.watch(wctx, ch) + + services[name] = cancel + } + + // Check for removed services. + for name, cancel := range services { + if _, ok := srvs[name]; !ok { + // Call the watch cancelation function. + cancel() + delete(services, name) + + // Send clearing target group. + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{{Source: name}}: + } + } + } + } +} + +// consulService contains data belonging to the same service. +type consulService struct { + name string + labels model.LabelSet + client *consul.Client + tagSeparator string +} + +func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) { + catalog := srv.client.Catalog() + + lastIndex := uint64(0) + for { + nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: watchTimeout, + }) + // Check the context before potentially falling in a continue-loop. select { - case <-done: - cd.mu.Unlock() + case <-ctx.Done(): return default: // Continue. } - // Check for new services. - for name := range srvs { - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { - continue - } - srv, ok := cd.services[name] - if !ok { - srv = &consulService{ - name: name, - done: make(chan struct{}), - } - srv.tgroup.Source = name - cd.services[name] = srv - } - srv.tgroup.Labels = model.LabelSet{ - serviceLabel: model.LabelValue(name), - datacenterLabel: model.LabelValue(cd.clientDatacenter), - } - update <- srv - } - // Check for removed services. - for name, srv := range cd.services { - if _, ok := srvs[name]; !ok { - srv.removed = true - update <- srv - delete(cd.services, name) - } - } - cd.mu.Unlock() - } -} -// watchService retrieves updates about srv from Consul's service endpoint. -// On a potential update the resulting target group is sent to ch. -func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { - catalog := cd.client.Catalog() - for { - nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ - WaitIndex: srv.lastIndex, - WaitTime: watchTimeout, - }) if err != nil { log.Errorf("Error refreshing service %s: %s", srv.name, err) time.Sleep(retryInterval) continue } // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == srv.lastIndex { + if meta.LastIndex == lastIndex { continue } - srv.lastIndex = meta.LastIndex - srv.tgroup.Targets = make([]model.LabelSet, 0, len(nodes)) + lastIndex = meta.LastIndex + + tgroup := config.TargetGroup{ + Source: srv.name, + Labels: srv.labels, + Targets: make([]model.LabelSet, 0, len(nodes)), + } for _, node := range nodes { - addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) - // We surround the separated list with the separator as well. This way regular expressions - // in relabeling rules don't have to consider tag positions. - tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator - - srv.tgroup.Targets = append(srv.tgroup.Targets, model.LabelSet{ + var ( + addr = fmt.Sprintf("%s:%d", node.Address, node.ServicePort) + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + tags = srv.tagSeparator + strings.Join(node.ServiceTags, srv.tagSeparator) + srv.tagSeparator + ) + tgroup.Targets = append(tgroup.Targets, model.LabelSet{ model.AddressLabel: model.LabelValue(addr), addressLabel: model.LabelValue(node.Address), nodeLabel: model.LabelValue(node.Node), @@ -251,20 +235,16 @@ func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.Target serviceIDLabel: model.LabelValue(node.ServiceID), }) } - - cd.mu.Lock() + // Check context twice to ensure we always catch cancelation. select { - case <-srv.done: - cd.mu.Unlock() + case <-ctx.Done(): return default: - // Continue. } - // TODO(fabxc): do a copy for now to avoid races. The integration - // needs needs some general cleanup. - tg := srv.tgroup - ch <- []*config.TargetGroup{&tg} - - cd.mu.Unlock() + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{&tgroup}: + } } }