diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 84ed6d7ac1..5120081700 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -15,7 +15,6 @@ package discovery import ( "fmt" - "net/http" "strconv" "strings" "sync" @@ -24,6 +23,7 @@ import ( consul "github.com/hashicorp/consul/api" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -113,52 +113,24 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) (*ConsulDiscovery, error) { return cd, nil } -// Sources implements the TargetProvider interface. -func (cd *ConsulDiscovery) Sources() []string { - clientConf := *cd.clientConf - clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second} - - client, err := consul.NewClient(&clientConf) - if err != nil { - // NewClient always returns a nil error. - panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err)) - } - - srvs, _, err := client.Catalog().Services(nil) - if err != nil { - log.Errorf("Error refreshing service list: %s", err) - return nil - } - cd.mu.Lock() - defer cd.mu.Unlock() - - srcs := make([]string, 0, len(srvs)) - for name := range srvs { - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) == 0 || ok { - srcs = append(srcs, name) - } - } - return srcs -} - // Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (cd *ConsulDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) defer cd.stop() update := make(chan *consulService, 10) - go cd.watchServices(update, done) + go cd.watchServices(update, ctx.Done()) for { select { - case <-done: + case <-ctx.Done(): return case srv := <-update: if srv.removed { close(srv.done) // Send clearing update. - ch <- config.TargetGroup{Source: srv.name} + ch <- []*config.TargetGroup{{Source: srv.name}} break } // Launch watcher for the service. @@ -244,7 +216,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch // watchService retrieves updates about srv from Consul's service endpoint. // On a potential update the resulting target group is sent to ch. -func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) { +func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { catalog := cd.client.Catalog() for { nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ @@ -288,7 +260,11 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.Tar default: // Continue. } - ch <- srv.tgroup + // TODO(fabxc): do a copy for now to avoid races. The integration + // needs needs some general cleanup. + tg := srv.tgroup + ch <- []*config.TargetGroup{&tg} + cd.mu.Unlock() } } diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index 40a8841e1f..3c75f07fc7 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -91,7 +92,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery { } // Run implements the TargetProvider interface. -func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (dd *DNSDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) ticker := time.NewTicker(dd.interval) @@ -104,22 +105,13 @@ func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) select { case <-ticker.C: dd.refreshAll(ch) - case <-done: + case <-ctx.Done(): return } } } -// Sources implements the TargetProvider interface. -func (dd *DNSDiscovery) Sources() []string { - var srcs []string - for _, name := range dd.names { - srcs = append(srcs, name) - } - return srcs -} - -func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) { +func (dd *DNSDiscovery) refreshAll(ch chan<- []*config.TargetGroup) { var wg sync.WaitGroup wg.Add(len(dd.names)) for _, name := range dd.names { @@ -133,7 +125,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) { wg.Wait() } -func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error { +func (dd *DNSDiscovery) refresh(name string, ch chan<- []*config.TargetGroup) error { response, err := lookupAll(name, dd.qtype) dnsSDLookupsCount.Inc() if err != nil { @@ -141,7 +133,8 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error return err } - var tg config.TargetGroup + tg := &config.TargetGroup{} + for _, record := range response.Answer { target := model.LabelValue("") switch addr := record.(type) { @@ -166,7 +159,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error } tg.Source = name - ch <- tg + ch <- []*config.TargetGroup{tg} return nil } diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 46b3d371f4..e390169486 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/aws/defaults" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/aws/aws-sdk-go/service/ec2" "github.com/prometheus/prometheus/config" @@ -46,7 +47,6 @@ const ( // the TargetProvider interface. type EC2Discovery struct { aws *aws.Config - done chan struct{} interval time.Duration port int } @@ -62,14 +62,13 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { Region: &conf.Region, Credentials: creds, }, - done: make(chan struct{}), interval: time.Duration(conf.RefreshInterval), port: conf.Port, } } // Run implements the TargetProvider interface. -func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) ticker := time.NewTicker(ed.interval) @@ -80,7 +79,7 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- *tg + ch <- []*config.TargetGroup{tg} } for { @@ -90,19 +89,14 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- *tg + ch <- []*config.TargetGroup{tg} } - case <-done: + case <-ctx.Done(): return } } } -// Sources implements the TargetProvider interface. -func (ed *EC2Discovery) Sources() []string { - return []string{*ed.aws.Region} -} - func (ed *EC2Discovery) refresh() (*config.TargetGroup, error) { ec2s := ec2.New(ed.aws) tg := &config.TargetGroup{ diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index eb0411b30f..9aedb16058 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "gopkg.in/fsnotify.v1" "gopkg.in/yaml.v2" @@ -53,23 +54,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery { } } -// Sources implements the TargetProvider interface. -func (fd *FileDiscovery) Sources() []string { - var srcs []string - // As we allow multiple target groups per file we have no choice - // but to parse them all. - for _, p := range fd.listFiles() { - tgroups, err := readFile(p) - if err != nil { - log.Errorf("Error reading file %q: %s", p, err) - } - for _, tg := range tgroups { - srcs = append(srcs, tg.Source) - } - } - return srcs -} - // listFiles returns a list of all files that match the configured patterns. func (fd *FileDiscovery) listFiles() []string { var paths []string @@ -103,7 +87,7 @@ func (fd *FileDiscovery) watchFiles() { } // Run implements the TargetProvider interface. -func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) defer fd.stop() @@ -123,11 +107,11 @@ func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) // Stopping has priority over refreshing. Thus we wrap the actual select // clause to always catch done signals. select { - case <-done: + case <-ctx.Done(): return default: select { - case <-done: + case <-ctx.Done(): return case event := <-fd.watcher.Events: @@ -188,7 +172,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { +func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { ref := map[string]int{} for _, p := range fd.listFiles() { tgroups, err := readFile(p) @@ -198,9 +182,8 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { ref[p] = fd.lastRefresh[p] continue } - for _, tg := range tgroups { - ch <- *tg - } + ch <- tgroups + ref[p] = len(tgroups) } // Send empty updates for sources that disappeared. @@ -208,7 +191,9 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- config.TargetGroup{Source: fileSource(f, i)} + ch <- []*config.TargetGroup{ + {Source: fileSource(f, i)}, + } } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 4c1407666a..2e1babce14 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -27,17 +28,17 @@ func testFileSD(t *testing.T, ext string) { conf.RefreshInterval = model.Duration(1 * time.Hour) var ( - fsd = NewFileDiscovery(&conf) - ch = make(chan config.TargetGroup) - done = make(chan struct{}) + fsd = NewFileDiscovery(&conf) + ch = make(chan []*config.TargetGroup) + ctx, cancel = context.WithCancel(context.Background()) ) - go fsd.Run(ch, done) + go fsd.Run(ctx, ch) select { case <-time.After(25 * time.Millisecond): // Expected. - case tg := <-ch: - t.Fatalf("Unexpected target group in file discovery: %s", tg) + case tgs := <-ch: + t.Fatalf("Unexpected target groups in file discovery: %s", tgs) } newf, err := os.Create("fixtures/_test" + ext) @@ -58,37 +59,37 @@ func testFileSD(t *testing.T, ext string) { } newf.Close() - // The files contain two target groups which are read and sent in order. + // The files contain two target groups. select { case <-time.After(15 * time.Second): t.Fatalf("Expected new target group but got none") - case tg := <-ch: + case tgs := <-ch: + tg := tgs[0] + if _, ok := tg.Labels["foo"]; !ok { t.Fatalf("Label not parsed") } if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) { t.Fatalf("Unexpected target group %s", tg) } - } - select { - case <-time.After(15 * time.Second): - t.Fatalf("Expected new target group but got none") - case tg := <-ch: + + tg = tgs[1] if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) { - t.Fatalf("Unexpected target group %s", tg) + t.Fatalf("Unexpected target groups %s", tg) } } + // Based on unknown circumstances, sometimes fsnotify will trigger more events in // some runs (which might be empty, chains of different operations etc.). // We have to drain those (as the target manager would) to avoid deadlocking and must // not try to make sense of it all... drained := make(chan struct{}) go func() { - for tg := range ch { + for tgs := range ch { // Below we will change the file to a bad syntax. Previously extracted target // groups must not be deleted via sending an empty target group. - if len(tg.Targets) == 0 { - t.Errorf("Unexpected empty target group received: %s", tg) + if len(tgs[0].Targets) == 0 { + t.Errorf("Unexpected empty target groups received: %s", tgs) } } close(drained) @@ -107,6 +108,6 @@ func testFileSD(t *testing.T, ext string) { os.Rename(newf.Name(), "fixtures/_test"+ext) - close(done) + cancel() <-drained } diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index a23a3e08e4..fd7974f8f0 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" @@ -94,75 +95,35 @@ func (kd *Discovery) Initialize() error { return nil } -// Sources implements the TargetProvider interface. -func (kd *Discovery) Sources() []string { - sourceNames := make([]string, 0, len(kd.apiServers)) - for _, apiServer := range kd.apiServers { - sourceNames = append(sourceNames, apiServersTargetGroupName+":"+apiServer.Host) - } - - nodes, _, err := kd.getNodes() - if err != nil { - // If we can't list nodes then we can't watch them. Assume this is a misconfiguration - // & log & return empty. - log.Errorf("Unable to initialize Kubernetes nodes: %s", err) - return []string{} - } - sourceNames = append(sourceNames, kd.nodeSources(nodes)...) - - services, _, err := kd.getServices() - if err != nil { - // If we can't list services then we can't watch them. Assume this is a misconfiguration - // & log & return empty. - log.Errorf("Unable to initialize Kubernetes services: %s", err) - return []string{} - } - sourceNames = append(sourceNames, kd.serviceSources(services)...) - - return sourceNames -} - -func (kd *Discovery) nodeSources(nodes map[string]*Node) []string { - var sourceNames []string - for name := range nodes { - sourceNames = append(sourceNames, nodesTargetGroupName+":"+name) - } - return sourceNames -} - -func (kd *Discovery) serviceSources(services map[string]map[string]*Service) []string { - var sourceNames []string - for _, ns := range services { - for _, service := range ns { - sourceNames = append(sourceNames, serviceSource(service)) - } - } - return sourceNames -} - // Run implements the TargetProvider interface. -func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) - if tg := kd.updateAPIServersTargetGroup(); tg != nil { - select { - case ch <- *tg: - case <-done: - return - } + // Send an initial full view. + // TODO(fabxc): this does not include all available services and service + // endpoints yet. Service endpoints were also missing in the previous Sources() method. + var all []*config.TargetGroup + + all = append(all, kd.updateAPIServersTargetGroup()) + all = append(all, kd.updateNodesTargetGroup()) + + select { + case ch <- all: + case <-ctx.Done(): + return } retryInterval := time.Duration(kd.Conf.RetryInterval) update := make(chan interface{}, 10) - go kd.watchNodes(update, done, retryInterval) - go kd.startServiceWatch(update, done, retryInterval) + go kd.watchNodes(update, ctx.Done(), retryInterval) + go kd.startServiceWatch(update, ctx.Done(), retryInterval) var tg *config.TargetGroup for { select { - case <-done: + case <-ctx.Done(): return case event := <-update: switch obj := event.(type) { @@ -181,8 +142,8 @@ func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { } select { - case ch <- *tg: - case <-done: + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): return } } diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go index 181ea440f4..6361a78732 100644 --- a/retrieval/discovery/marathon.go +++ b/retrieval/discovery/marathon.go @@ -17,6 +17,8 @@ import ( "time" "github.com/prometheus/common/log" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) @@ -40,25 +42,13 @@ func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery { } } -// Sources implements the TargetProvider interface. -func (md *MarathonDiscovery) Sources() []string { - var sources []string - tgroups, err := md.fetchTargetGroups() - if err == nil { - for source := range tgroups { - sources = append(sources, source) - } - } - return sources -} - // Run implements the TargetProvider interface. -func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (md *MarathonDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) for { select { - case <-done: + case <-ctx.Done(): return case <-time.After(md.refreshInterval): err := md.updateServices(ch) @@ -69,23 +59,24 @@ func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struc } } -func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error { +func (md *MarathonDiscovery) updateServices(ch chan<- []*config.TargetGroup) error { targetMap, err := md.fetchTargetGroups() if err != nil { return err } - // Update services which are still present + all := make([]*config.TargetGroup, 0, len(targetMap)) for _, tg := range targetMap { - ch <- *tg + all = append(all, tg) } + ch <- all // Remove services which did disappear for source := range md.lastRefresh { _, ok := targetMap[source] if !ok { log.Debugf("Removing group for %s", source) - ch <- config.TargetGroup{Source: source} + ch <- []*config.TargetGroup{{Source: source}} } } diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go index f54d828077..cc58a13808 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery/marathon" @@ -26,8 +27,8 @@ import ( var marathonValidLabel = map[string]string{"prometheus": "yes"} -func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) { - ch := make(chan config.TargetGroup) +func newTestDiscovery(client marathon.AppListClient) (chan []*config.TargetGroup, *MarathonDiscovery) { + ch := make(chan []*config.TargetGroup) md := NewMarathonDiscovery(&config.MarathonSDConfig{ Servers: []string{"http://localhost:8080"}, }) @@ -60,7 +61,9 @@ func TestMarathonSDEmptyList(t *testing.T) { go func() { select { case tg := <-ch: - t.Fatalf("Got group: %v", tg) + if len(tg) > 0 { + t.Fatalf("Got group: %v", tg) + } default: } }() @@ -96,7 +99,9 @@ func TestMarathonSDSendGroup(t *testing.T) { }) go func() { select { - case tg := <-ch: + case tgs := <-ch: + tg := tgs[0] + if tg.Source != "test-service" { t.Fatalf("Wrong target group name: %s", tg.Source) } @@ -121,9 +126,10 @@ func TestMarathonSDRemoveApp(t *testing.T) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) + go func() { - up1 := <-ch - up2 := <-ch + up1 := (<-ch)[0] + up2 := (<-ch)[0] if up2.Source != up1.Source { t.Fatalf("Source is different: %s", up2) if len(up2.Targets) > 0 { @@ -145,33 +151,25 @@ func TestMarathonSDRemoveApp(t *testing.T) { } } -func TestMarathonSDSources(t *testing.T) { - _, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { - return marathonTestAppList(marathonValidLabel, 1), nil - }) - sources := md.Sources() - if len(sources) != 1 { - t.Fatalf("Wrong number of sources: %s", sources) - } -} - func TestMarathonSDRunAndStop(t *testing.T) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) md.refreshInterval = time.Millisecond * 10 - done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) go func() { select { case <-ch: - close(done) + cancel() case <-time.After(md.refreshInterval * 3): - close(done) + cancel() t.Fatalf("Update took too long.") } }() - md.Run(ch, done) + + md.Run(ctx, ch) + select { case <-ch: default: diff --git a/retrieval/discovery/nerve.go b/retrieval/discovery/nerve.go index d234160627..f99c9d61f5 100644 --- a/retrieval/discovery/nerve.go +++ b/retrieval/discovery/nerve.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/common/model" "github.com/samuel/go-zookeeper/zk" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/treecache" @@ -47,7 +48,7 @@ type NerveDiscovery struct { conn *zk.Conn mu sync.RWMutex sources map[string]*config.TargetGroup - sdUpdates *chan<- config.TargetGroup + sdUpdates *chan<- []*config.TargetGroup updates chan treecache.ZookeeperTreeCacheEvent treeCaches []*treecache.ZookeeperTreeCache } @@ -73,17 +74,6 @@ func NewNerveDiscovery(conf *config.NerveSDConfig) *NerveDiscovery { return sd } -// Sources implements the TargetProvider interface. -func (sd *NerveDiscovery) Sources() []string { - sd.mu.RLock() - defer sd.mu.RUnlock() - srcs := []string{} - for t := range sd.sources { - srcs = append(srcs, t) - } - return srcs -} - func (sd *NerveDiscovery) processUpdates() { defer sd.conn.Close() for event := range sd.updates { @@ -104,7 +94,7 @@ func (sd *NerveDiscovery) processUpdates() { } sd.mu.Unlock() if sd.sdUpdates != nil { - *sd.sdUpdates <- *tg + *sd.sdUpdates <- []*config.TargetGroup{tg} } } @@ -114,17 +104,22 @@ func (sd *NerveDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *NerveDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (sd *NerveDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Send on everything we have seen so far. sd.mu.Lock() - for _, targetGroup := range sd.sources { - ch <- *targetGroup + + all := make([]*config.TargetGroup, 0, len(sd.sources)) + + for _, tg := range sd.sources { + all = append(all, tg) } + ch <- all + // Tell processUpdates to send future updates. sd.sdUpdates = &ch sd.mu.Unlock() - <-done + <-ctx.Done() for _, tc := range sd.treeCaches { tc.Stop() } diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index d1172b2438..5a13af63cc 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/samuel/go-zookeeper/zk" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" @@ -57,7 +58,7 @@ type ServersetDiscovery struct { conn *zk.Conn mu sync.RWMutex sources map[string]*config.TargetGroup - sdUpdates *chan<- config.TargetGroup + sdUpdates *chan<- []*config.TargetGroup updates chan treecache.ZookeeperTreeCacheEvent treeCaches []*treecache.ZookeeperTreeCache } @@ -83,17 +84,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery { return sd } -// Sources implements the TargetProvider interface. -func (sd *ServersetDiscovery) Sources() []string { - sd.mu.RLock() - defer sd.mu.RUnlock() - srcs := []string{} - for t := range sd.sources { - srcs = append(srcs, t) - } - return srcs -} - func (sd *ServersetDiscovery) processUpdates() { defer sd.conn.Close() for event := range sd.updates { @@ -114,7 +104,7 @@ func (sd *ServersetDiscovery) processUpdates() { } sd.mu.Unlock() if sd.sdUpdates != nil { - *sd.sdUpdates <- *tg + *sd.sdUpdates <- []*config.TargetGroup{tg} } } @@ -124,17 +114,22 @@ func (sd *ServersetDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (sd *ServersetDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Send on everything we have seen so far. sd.mu.Lock() - for _, targetGroup := range sd.sources { - ch <- *targetGroup + + all := make([]*config.TargetGroup, 0, len(sd.sources)) + + for _, tg := range sd.sources { + all = append(all, tg) } + ch <- all + // Tell processUpdates to send future updates. sd.sdUpdates = &ch sd.mu.Unlock() - <-done + <-ctx.Done() for _, tc := range sd.treeCaches { tc.Stop() } @@ -142,8 +137,8 @@ func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan stru func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { member := serversetMember{} - err := json.Unmarshal(data, &member) - if err != nil { + + if err := json.Unmarshal(data, &member); err != nil { return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err) } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index e223cc332e..7328c9e03f 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -40,7 +40,7 @@ type TargetProvider interface { // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. // On receiving from done Run must return. - Run(up chan<- config.TargetGroup, done <-chan struct{}) + Run(ctx context.Context, up chan<- []*config.TargetGroup) } // TargetManager maintains a set of targets, starts and stops their scraping and @@ -178,7 +178,7 @@ func (ss *scrapeSet) run(ctx context.Context) { for name, prov := range providers { var ( - updates = make(chan config.TargetGroup) + updates = make(chan []*config.TargetGroup) ) wg.Add(1) @@ -192,23 +192,17 @@ func (ss *scrapeSet) run(ctx context.Context) { case <-ctx.Done(): ss.stopScrapers(name) return - case update := <-updates: - if err := ss.update(name, &update); err != nil { - log.With("target_group", update).Errorf("Target update failed: %s", err) + case tgs := <-updates: + for _, tg := range tgs { + if err := ss.update(name, tg); err != nil { + log.With("target_group", tg).Errorf("Target update failed: %s", err) + } } } } }(name, prov) - done := make(chan struct{}) - - // TODO(fabxc): Adjust the TargetProvider interface so we can remove this - // redirection of the termination signal. - go func() { - <-ctx.Done() - close(done) - }() - go prov.Run(updates, done) + go prov.Run(ctx, updates) } wg.Wait() @@ -421,21 +415,16 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { for i, tg := range groups { tg.Source = fmt.Sprintf("%d", i) } - return &StaticProvider{ - TargetGroups: groups, - } + return &StaticProvider{groups} } // Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { - defer close(ch) - - for _, tg := range sd.TargetGroups { - select { - case <-done: - return - case ch <- *tg: - } +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): } - <-done + close(ch) }