From e03e138d34cd9f38bb374ff39645c5bf38fa953e Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Jul 2016 10:54:10 +0200 Subject: [PATCH 1/7] discovery: consolidate constructors into single file --- retrieval/discovery/consul.go | 24 --------------- .../discovery/{kubernetes.go => discovery.go} | 26 +++++++++++++++- retrieval/discovery/dns.go | 25 ---------------- retrieval/discovery/marathon.go | 30 ------------------- 4 files changed, 25 insertions(+), 80 deletions(-) delete mode 100644 retrieval/discovery/consul.go rename retrieval/discovery/{kubernetes.go => discovery.go} (55%) delete mode 100644 retrieval/discovery/dns.go delete mode 100644 retrieval/discovery/marathon.go diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go deleted file mode 100644 index 00f388d650..0000000000 --- a/retrieval/discovery/consul.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/retrieval/discovery/consul" -) - -// NewConsul creates a new Consul based Discovery. -func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { - return consul.NewDiscovery(cfg) -} diff --git a/retrieval/discovery/kubernetes.go b/retrieval/discovery/discovery.go similarity index 55% rename from retrieval/discovery/kubernetes.go rename to retrieval/discovery/discovery.go index bc1f73cc8c..795de81583 100644 --- a/retrieval/discovery/kubernetes.go +++ b/retrieval/discovery/discovery.go @@ -1,4 +1,4 @@ -// Copyright 2015 The Prometheus Authors +// Copyright 2016 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,10 +14,20 @@ package discovery import ( + "time" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/consul" + "github.com/prometheus/prometheus/retrieval/discovery/dns" "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" + "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) +// NewConsul creates a new Consul based Discovery. +func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { + return consul.NewDiscovery(cfg) +} + // NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration. func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discovery, error) { kd := &kubernetes.Discovery{ @@ -29,3 +39,17 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discov } return kd, nil } + +// NewMarathon creates a new Marathon based discovery. +func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { + return &marathon.Discovery{ + Servers: conf.Servers, + RefreshInterval: time.Duration(conf.RefreshInterval), + Client: marathon.FetchApps, + } +} + +// NewDNS creates a new DNS based discovery. +func NewDNS(conf *config.DNSSDConfig) *dns.Discovery { + return dns.NewDiscovery(conf) +} diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go deleted file mode 100644 index c413efdfc8..0000000000 --- a/retrieval/discovery/dns.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "github.com/prometheus/prometheus/retrieval/discovery/dns" - - "github.com/prometheus/prometheus/config" -) - -// NewDNS creates a new DNS based discovery. -func NewDNS(conf *config.DNSSDConfig) *dns.Discovery { - return dns.NewDiscovery(conf) -} diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go deleted file mode 100644 index e22cf48f02..0000000000 --- a/retrieval/discovery/marathon.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "time" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/retrieval/discovery/marathon" -) - -// NewMarathon creates a new Marathon based discovery. -func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { - return &marathon.Discovery{ - Servers: conf.Servers, - RefreshInterval: time.Duration(conf.RefreshInterval), - Client: marathon.FetchApps, - } -} From 8a97c211a800695150abf5c2e96cfb277db1d20b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Jul 2016 12:17:17 +0200 Subject: [PATCH 2/7] discovery/kubernetes: extract pod discovery This change extracts pod discovery into its own type. --- retrieval/discovery/kubernetes/discovery.go | 287 ++--------------- retrieval/discovery/kubernetes/pod.go | 332 ++++++++++++++++++++ 2 files changed, 350 insertions(+), 269 deletions(-) create mode 100644 retrieval/discovery/kubernetes/pod.go diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index c2db40cb5d..1b7e02ca14 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,16 +14,13 @@ package kubernetes import ( - "bytes" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" - "sort" "strconv" - "strings" "sync" "time" @@ -113,10 +110,8 @@ type Discovery struct { nodes map[string]*Node services map[string]map[string]*Service // map of namespace to (map of pod name to pod) - pods map[string]map[string]*Pod nodesMu sync.RWMutex servicesMu sync.RWMutex - podsMu sync.RWMutex runDone chan struct{} } @@ -140,6 +135,19 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { log.Debugf("Kubernetes Discovery.Run beginning") defer close(ch) + var wg sync.WaitGroup + + pd := &podDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + pods: map[string]map[string]*Pod{}, + kd: kd, + } + wg.Add(1) + go func() { + pd.run(ctx, ch) + wg.Done() + }() + // Send an initial full view. // TODO(fabxc): this does not include all available services and service // endpoints yet. Service endpoints were also missing in the previous Sources() method. @@ -148,22 +156,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { all = append(all, kd.updateAPIServersTargetGroup()) all = append(all, kd.updateNodesTargetGroup()) - pods, _, err := kd.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - kd.podsMu.Lock() - kd.pods = pods - kd.podsMu.Unlock() - - all = append(all, kd.updatePodsTargetGroup()) - for _, ns := range kd.pods { - for _, pod := range ns { - all = append(all, kd.updatePodTargetGroup(pod)) - } - } - select { case ch <- all: case <-ctx.Done(): @@ -176,7 +168,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { go kd.watchNodes(update, ctx.Done(), retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval) - go kd.watchPods(update, ctx.Done(), retryInterval) for { tg := []*config.TargetGroup{} @@ -195,13 +186,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case *endpointsEvent: log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) - case *podEvent: - log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name) - // Update the per-pod target group - kd.updatePod(obj.Pod, obj.EventType) - tg = append(tg, kd.updatePodTargetGroup(obj.Pod)) - // ...and update the all pods target group - tg = append(tg, kd.updatePodsTargetGroup()) } } @@ -217,6 +201,8 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } } } + + wg.Wait() } func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { @@ -227,6 +213,9 @@ func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { return kd.queryAPIServerReq(req) } +type client struct { +} + func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) { // Lock in case we need to rotate API servers to request. kd.apiServersMu.Lock() @@ -824,243 +813,3 @@ func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { } return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) } - -func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { - kd.podsMu.Lock() - defer kd.podsMu.Unlock() - - switch eventType { - case Deleted: - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok { - delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) - if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 { - delete(kd.pods, pod.ObjectMeta.Namespace) - } - } - case Added, Modified: - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { - kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod - } -} - -func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) { - res, err := kd.queryAPIServerPath(podsURL) - if err != nil { - return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) - } - - var pods PodList - if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) - } - - podMap := map[string]map[string]*Pod{} - for idx, pod := range pods.Items { - if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { - podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) - podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] - } - - return podMap, pods.ResourceVersion, nil -} - -func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - pods, resourceVersion, err := kd.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - kd.podsMu.Lock() - kd.pods = pods - kd.podsMu.Unlock() - - req, err := http.NewRequest("GET", podsURL, nil) - if err != nil { - log.Errorf("Cannot create pods request: %s", err) - return - } - - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch pods: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch pods: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event podEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch pods unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - -func podSource(pod *Pod) string { - return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name -} - -type ByContainerPort []ContainerPort - -func (a ByContainerPort) Len() int { return len(a) } -func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } -func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -type ByContainerName []Container - -func (a ByContainerName) Len() int { return len(a) } -func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } -func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { - var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) - if pod.PodStatus.PodIP == "" { - log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) - return targets - } - - if pod.PodStatus.Phase != "Running" { - log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) - return targets - } - - ready := "unknown" - for _, cond := range pod.PodStatus.Conditions { - if strings.ToLower(cond.Type) == "ready" { - ready = strings.ToLower(cond.Status) - } - } - - sort.Sort(ByContainerName(pod.PodSpec.Containers)) - - for _, container := range pod.PodSpec.Containers { - // Collect a list of TCP ports - // Sort by port number, ascending - // Product a target pointed at the first port - // Include a label containing all ports (portName=port,PortName=port,...,) - var tcpPorts []ContainerPort - var portLabel *bytes.Buffer = bytes.NewBufferString(",") - - for _, port := range container.Ports { - if port.Protocol == "TCP" { - tcpPorts = append(tcpPorts, port) - } - } - - if len(tcpPorts) == 0 { - log.Debugf("skipping container %s with no TCP ports", container.Name) - continue - } - - sort.Sort(ByContainerPort(tcpPorts)) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), - podNameLabel: model.LabelValue(pod.ObjectMeta.Name), - podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), - podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), - podContainerNameLabel: model.LabelValue(container.Name), - podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), - podReadyLabel: model.LabelValue(ready), - } - - for _, port := range tcpPorts { - portLabel.WriteString(port.Name) - portLabel.WriteString("=") - portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) - portLabel.WriteString(",") - t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) - } - - t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) - - for k, v := range pod.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(podLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range pod.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - targets = append(targets, t) - - if !allContainers { - break - } - } - - if len(targets) == 0 { - log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) - } - - return targets -} - -func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { - kd.podsMu.RLock() - defer kd.podsMu.RUnlock() - - tg := &config.TargetGroup{ - Source: podSource(pod), - } - - // If this pod doesn't exist, return an empty target group - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { - return tg - } - if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { - return tg - } - - tg.Labels = model.LabelSet{ - roleLabel: model.LabelValue("container"), - } - tg.Targets = updatePodTargets(pod, true) - - return tg -} - -func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup { - tg := &config.TargetGroup{ - Source: podsTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("pod"), - }, - } - - for _, namespace := range kd.pods { - for _, pod := range namespace { - tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) - } - } - - return tg -} diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go new file mode 100644 index 0000000000..8296188f8e --- /dev/null +++ b/retrieval/discovery/kubernetes/pod.go @@ -0,0 +1,332 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type podDiscovery struct { + mtx sync.RWMutex + pods map[string]map[string]*Pod + retryInterval time.Duration + kd *Discovery +} + +func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + pods, _, err := d.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + d.pods = pods + + initial := []*config.TargetGroup{d.updatePodsTargetGroup()} + for _, ns := range d.pods { + for _, pod := range ns { + initial = append(initial, d.updatePodTargetGroup(pod)) + } + } + + select { + case ch <- initial: + case <-ctx.Done(): + return + } + + update := make(chan *podEvent, 10) + go d.watchPods(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case e := <-update: + log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name) + // Update the per-pod target group + d.updatePod(e.Pod, e.EventType) + tgs = append(tgs, d.updatePodTargetGroup(e.Pod)) + // ...and update the all pods target group + tgs = append(tgs, d.updatePodsTargetGroup()) + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } +} + +func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) { + res, err := d.kd.queryAPIServerPath(podsURL) + if err != nil { + return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var pods PodList + if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) + } + + podMap := map[string]map[string]*Pod{} + for idx, pod := range pods.Items { + if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { + podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) + podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] + } + + return podMap, pods.ResourceVersion, nil +} + +func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + pods, resourceVersion, err := d.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + d.mtx.Lock() + d.pods = pods + d.mtx.Unlock() + + req, err := http.NewRequest("GET", podsURL, nil) + if err != nil { + log.Errorf("Cannot create pods request: %s", err) + return + } + + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch pods: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch pods: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event podEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch pods unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) { + d.mtx.Lock() + defer d.mtx.Unlock() + + switch eventType { + case Deleted: + if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok { + delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) + if len(d.pods[pod.ObjectMeta.Namespace]) == 0 { + delete(d.pods, pod.ObjectMeta.Namespace) + } + } + case Added, Modified: + if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { + d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod + } +} + +func (d *podDiscovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { + d.mtx.RLock() + defer d.mtx.RUnlock() + + tg := &config.TargetGroup{ + Source: podSource(pod), + } + + // If this pod doesn't exist, return an empty target group + if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { + return tg + } + if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { + return tg + } + + tg.Labels = model.LabelSet{ + roleLabel: model.LabelValue("container"), + } + tg.Targets = updatePodTargets(pod, true) + + return tg +} + +func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup { + tg := &config.TargetGroup{ + Source: podsTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("pod"), + }, + } + + for _, namespace := range d.pods { + for _, pod := range namespace { + tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) + } + } + + return tg +} + +func podSource(pod *Pod) string { + return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name +} + +func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { + var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) + if pod.PodStatus.PodIP == "" { + log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) + return targets + } + + if pod.PodStatus.Phase != "Running" { + log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) + return targets + } + + ready := "unknown" + for _, cond := range pod.PodStatus.Conditions { + if strings.ToLower(cond.Type) == "ready" { + ready = strings.ToLower(cond.Status) + } + } + + sort.Sort(ByContainerName(pod.PodSpec.Containers)) + + for _, container := range pod.PodSpec.Containers { + // Collect a list of TCP ports + // Sort by port number, ascending + // Product a target pointed at the first port + // Include a label containing all ports (portName=port,PortName=port,...,) + var tcpPorts []ContainerPort + var portLabel *bytes.Buffer = bytes.NewBufferString(",") + + for _, port := range container.Ports { + if port.Protocol == "TCP" { + tcpPorts = append(tcpPorts, port) + } + } + + if len(tcpPorts) == 0 { + log.Debugf("skipping container %s with no TCP ports", container.Name) + continue + } + + sort.Sort(ByContainerPort(tcpPorts)) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), + podNameLabel: model.LabelValue(pod.ObjectMeta.Name), + podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), + podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), + podContainerNameLabel: model.LabelValue(container.Name), + podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), + podReadyLabel: model.LabelValue(ready), + } + + for _, port := range tcpPorts { + portLabel.WriteString(port.Name) + portLabel.WriteString("=") + portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) + portLabel.WriteString(",") + t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) + } + + t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) + + for k, v := range pod.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(podLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range pod.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + targets = append(targets, t) + + if !allContainers { + break + } + } + + if len(targets) == 0 { + log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) + } + + return targets +} + +type ByContainerPort []ContainerPort + +func (a ByContainerPort) Len() int { return len(a) } +func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } +func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +type ByContainerName []Container + +func (a ByContainerName) Len() int { return len(a) } +func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } From fdbe28df85bd5490f91120ff33657cefe0b0f86b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Jul 2016 16:55:37 +0200 Subject: [PATCH 3/7] discovery/kubernetes: extract node discovery This change extracts node discovery into its own type. --- retrieval/discovery/kubernetes/discovery.go | 189 +-------------- retrieval/discovery/kubernetes/node.go | 242 ++++++++++++++++++++ 2 files changed, 252 insertions(+), 179 deletions(-) create mode 100644 retrieval/discovery/kubernetes/node.go diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index 1b7e02ca14..eff401569a 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -20,7 +20,6 @@ import ( "net" "net/http" "os" - "strconv" "sync" "time" @@ -107,10 +106,8 @@ type Discovery struct { apiServers []config.URL apiServersMu sync.RWMutex - nodes map[string]*Node services map[string]map[string]*Service // map of namespace to (map of pod name to pod) - nodesMu sync.RWMutex servicesMu sync.RWMutex runDone chan struct{} } @@ -139,7 +136,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { pd := &podDiscovery{ retryInterval: time.Duration(kd.Conf.RetryInterval), - pods: map[string]map[string]*Pod{}, kd: kd, } wg.Add(1) @@ -148,13 +144,22 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { wg.Done() }() + nd := &nodeDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } + wg.Add(1) + go func() { + nd.run(ctx, ch) + wg.Done() + }() + // Send an initial full view. // TODO(fabxc): this does not include all available services and service // endpoints yet. Service endpoints were also missing in the previous Sources() method. var all []*config.TargetGroup all = append(all, kd.updateAPIServersTargetGroup()) - all = append(all, kd.updateNodesTargetGroup()) select { case ch <- all: @@ -166,7 +171,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { update := make(chan interface{}, 10) - go kd.watchNodes(update, ctx.Done(), retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval) for { @@ -176,10 +180,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { return case event := <-update: switch obj := event.(type) { - case *nodeEvent: - log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name) - kd.updateNode(obj.Node, obj.EventType) - tg = append(tg, kd.updateNodesTargetGroup()) case *serviceEvent: log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) tg = append(tg, kd.updateService(obj.Service, obj.EventType)) @@ -272,91 +272,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup { return tg } -func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup { - kd.nodesMu.RLock() - defer kd.nodesMu.RUnlock() - - tg := &config.TargetGroup{ - Source: nodesTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("node"), - }, - } - - // Now let's loop through the nodes & add them to the target group with appropriate labels. - for nodeName, node := range kd.nodes { - defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) - if err != nil { - log.Debugf("Skipping node %s: %s", node.Name, err) - continue - } - - kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) - - address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - model.InstanceLabel: model.LabelValue(nodeName), - } - - for addrType, ip := range nodeAddressMap { - labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType)) - t[model.LabelName(labelName)] = model.LabelValue(ip[0].String()) - } - - t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort)) - - for k, v := range node.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - tg.Targets = append(tg.Targets, t) - } - - return tg -} - -func (kd *Discovery) updateNode(node *Node, eventType EventType) { - kd.nodesMu.Lock() - defer kd.nodesMu.Unlock() - updatedNodeName := node.ObjectMeta.Name - switch eventType { - case Deleted: - // Deleted - remove from nodes map. - delete(kd.nodes, updatedNodeName) - case Added, Modified: - // Added/Modified - update the node in the nodes map. - kd.nodes[updatedNodeName] = node - } -} - -func (kd *Discovery) getNodes() (map[string]*Node, string, error) { - res, err := kd.queryAPIServerPath(nodesURL) - if err != nil { - // If we can't list nodes then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) - } - - var nodes NodeList - if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) - } - - nodeMap := map[string]*Node{} - for idx, node := range nodes.Items { - nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx] - } - - return nodeMap, nodes.ResourceVersion, nil -} - func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) { res, err := kd.queryAPIServerPath(servicesURL) if err != nil { @@ -387,61 +302,6 @@ func (kd *Discovery) getServices() (map[string]map[string]*Service, string, erro return serviceMap, services.ResourceVersion, nil } -// watchNodes watches nodes as they come & go. -func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - nodes, resourceVersion, err := kd.getNodes() - if err != nil { - log.Errorf("Cannot initialize nodes collection: %s", err) - return - } - - // Reset the known nodes. - kd.nodesMu.Lock() - kd.nodes = map[string]*Node{} - kd.nodesMu.Unlock() - - for _, node := range nodes { - events <- &nodeEvent{Added, node} - } - - req, err := http.NewRequest("GET", nodesURL, nil) - if err != nil { - log.Errorf("Cannot create nodes request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch nodes: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch nodes: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event nodeEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch nodes unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - // watchServices watches services as they come & go. func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { @@ -784,32 +644,3 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) { } } } - -// nodeAddresses returns the provided node's address, based on the priority: -// 1. NodeInternalIP -// 2. NodeExternalIP -// 3. NodeLegacyHostIP -// -// Copied from k8s.io/kubernetes/pkg/util/node/node.go -func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { - addresses := node.Status.Addresses - addressMap := map[NodeAddressType][]net.IP{} - for _, addr := range addresses { - ip := net.ParseIP(addr.Address) - // All addresses should be valid IPs. - if ip == nil { - continue - } - addressMap[addr.Type] = append(addressMap[addr.Type], ip) - } - if addresses, ok := addressMap[NodeInternalIP]; ok { - return addresses[0], addressMap, nil - } - if addresses, ok := addressMap[NodeExternalIP]; ok { - return addresses[0], addressMap, nil - } - if addresses, ok := addressMap[NodeLegacyHostIP]; ok { - return addresses[0], addressMap, nil - } - return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) -} diff --git a/retrieval/discovery/kubernetes/node.go b/retrieval/discovery/kubernetes/node.go new file mode 100644 index 0000000000..e64bc2eb14 --- /dev/null +++ b/retrieval/discovery/kubernetes/node.go @@ -0,0 +1,242 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "strconv" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type nodeDiscovery struct { + mtx sync.RWMutex + nodes map[string]*Node + retryInterval time.Duration + kd *Discovery +} + +func (d *nodeDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + select { + case ch <- []*config.TargetGroup{d.updateNodesTargetGroup()}: + case <-ctx.Done(): + return + } + + update := make(chan *nodeEvent, 10) + go d.watchNodes(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case e := <-update: + log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", e.EventType, e.Node.ObjectMeta.Name) + d.updateNode(e.Node, e.EventType) + tgs = append(tgs, d.updateNodesTargetGroup()) + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } +} + +func (d *nodeDiscovery) updateNodesTargetGroup() *config.TargetGroup { + d.mtx.RLock() + defer d.mtx.RUnlock() + + tg := &config.TargetGroup{ + Source: nodesTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("node"), + }, + } + + // Now let's loop through the nodes & add them to the target group with appropriate labels. + for nodeName, node := range d.nodes { + defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) + if err != nil { + log.Debugf("Skipping node %s: %s", node.Name, err) + continue + } + + kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) + + address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(address), + model.InstanceLabel: model.LabelValue(nodeName), + } + + for addrType, ip := range nodeAddressMap { + labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType)) + t[model.LabelName(labelName)] = model.LabelValue(ip[0].String()) + } + + t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort)) + + for k, v := range node.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + tg.Targets = append(tg.Targets, t) + } + + return tg +} + +// watchNodes watches nodes as they come & go. +func (d *nodeDiscovery) watchNodes(events chan *nodeEvent, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + nodes, resourceVersion, err := d.getNodes() + if err != nil { + log.Errorf("Cannot initialize nodes collection: %s", err) + return + } + + // Reset the known nodes. + d.mtx.Lock() + d.nodes = map[string]*Node{} + d.mtx.Unlock() + + for _, node := range nodes { + events <- &nodeEvent{Added, node} + } + + req, err := http.NewRequest("GET", nodesURL, nil) + if err != nil { + log.Errorf("Cannot create nodes request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch nodes: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch nodes: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event nodeEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch nodes unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func (d *nodeDiscovery) updateNode(node *Node, eventType EventType) { + d.mtx.Lock() + defer d.mtx.Unlock() + + updatedNodeName := node.ObjectMeta.Name + switch eventType { + case Deleted: + // Deleted - remove from nodes map. + delete(d.nodes, updatedNodeName) + case Added, Modified: + // Added/Modified - update the node in the nodes map. + d.nodes[updatedNodeName] = node + } +} + +func (d *nodeDiscovery) getNodes() (map[string]*Node, string, error) { + res, err := d.kd.queryAPIServerPath(nodesURL) + if err != nil { + // If we can't list nodes then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var nodes NodeList + if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) + } + + nodeMap := map[string]*Node{} + for idx, node := range nodes.Items { + nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx] + } + + return nodeMap, nodes.ResourceVersion, nil +} + +// nodeAddresses returns the provided node's address, based on the priority: +// 1. NodeInternalIP +// 2. NodeExternalIP +// 3. NodeLegacyHostIP +// +// Copied from k8s.io/kubernetes/pkg/util/node/node.go +func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { + addresses := node.Status.Addresses + addressMap := map[NodeAddressType][]net.IP{} + for _, addr := range addresses { + ip := net.ParseIP(addr.Address) + // All addresses should be valid IPs. + if ip == nil { + continue + } + addressMap[addr.Type] = append(addressMap[addr.Type], ip) + } + if addresses, ok := addressMap[NodeInternalIP]; ok { + return addresses[0], addressMap, nil + } + if addresses, ok := addressMap[NodeExternalIP]; ok { + return addresses[0], addressMap, nil + } + if addresses, ok := addressMap[NodeLegacyHostIP]; ok { + return addresses[0], addressMap, nil + } + return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) +} From e0f8caacd757538e3193bf8b18c84014197eedbb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Jul 2016 17:45:07 +0200 Subject: [PATCH 4/7] discovery/kubernetes: extract service endpoint discovery This extract discovery of services and their endpoints into its own type. --- retrieval/discovery/kubernetes/discovery.go | 350 +------------------ retrieval/discovery/kubernetes/service.go | 366 ++++++++++++++++++++ 2 files changed, 377 insertions(+), 339 deletions(-) create mode 100644 retrieval/discovery/kubernetes/service.go diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index eff401569a..1e2a6698b3 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,7 +14,6 @@ package kubernetes import ( - "encoding/json" "fmt" "io/ioutil" "net" @@ -29,7 +28,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" - "github.com/prometheus/prometheus/util/strutil" ) const ( @@ -106,10 +104,7 @@ type Discovery struct { apiServers []config.URL apiServersMu sync.RWMutex - services map[string]map[string]*Service - // map of namespace to (map of pod name to pod) - servicesMu sync.RWMutex - runDone chan struct{} + runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -154,6 +149,16 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { wg.Done() }() + sd := &serviceDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } + wg.Add(1) + go func() { + sd.run(ctx, ch) + wg.Done() + }() + // Send an initial full view. // TODO(fabxc): this does not include all available services and service // endpoints yet. Service endpoints were also missing in the previous Sources() method. @@ -167,41 +172,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { return } - retryInterval := time.Duration(kd.Conf.RetryInterval) - - update := make(chan interface{}, 10) - - go kd.startServiceWatch(update, ctx.Done(), retryInterval) - - for { - tg := []*config.TargetGroup{} - select { - case <-ctx.Done(): - return - case event := <-update: - switch obj := event.(type) { - case *serviceEvent: - log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) - tg = append(tg, kd.updateService(obj.Service, obj.EventType)) - case *endpointsEvent: - log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) - tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) - } - } - - if tg == nil { - continue - } - - for _, t := range tg { - select { - case ch <- []*config.TargetGroup{t}: - case <-ctx.Done(): - return - } - } - } - wg.Wait() } @@ -213,9 +183,6 @@ func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { return kd.queryAPIServerReq(req) } -type client struct { -} - func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) { // Lock in case we need to rotate API servers to request. kd.apiServersMu.Lock() @@ -272,297 +239,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup { return tg } -func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) { - res, err := kd.queryAPIServerPath(servicesURL) - if err != nil { - // If we can't list services then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) - } - var services ServiceList - if err := json.NewDecoder(res.Body).Decode(&services); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) - } - - serviceMap := map[string]map[string]*Service{} - for idx, service := range services.Items { - namespace, ok := serviceMap[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - serviceMap[service.ObjectMeta.Namespace] = namespace - } - namespace[service.ObjectMeta.Name] = &services.Items[idx] - } - - return serviceMap, services.ResourceVersion, nil -} - -// watchServices watches services as they come & go. -func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted - // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. - existingServices := kd.services - - // Reset the known services. - kd.servicesMu.Lock() - kd.services = map[string]map[string]*Service{} - kd.servicesMu.Unlock() - - services, resourceVersion, err := kd.getServices() - if err != nil { - log.Errorf("Cannot initialize services collection: %s", err) - return - } - - // Now let's loop through the old services & see if they still exist in here - for oldNSName, oldNS := range existingServices { - if ns, ok := services[oldNSName]; !ok { - for _, service := range existingServices[oldNSName] { - events <- &serviceEvent{Deleted, service} - } - } else { - for oldServiceName, oldService := range oldNS { - if _, ok := ns[oldServiceName]; !ok { - events <- &serviceEvent{Deleted, oldService} - } - } - } - } - - // Discard the existing services map for GC. - existingServices = nil - - for _, ns := range services { - for _, service := range ns { - events <- &serviceEvent{Added, service} - } - } - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - kd.watchServices(resourceVersion, events, done) - wg.Done() - }() - go func() { - kd.watchServiceEndpoints(resourceVersion, events, done) - wg.Done() - }() - - wg.Wait() - }, retryInterval, done) -} - -func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", servicesURL, nil) - if err != nil { - log.Errorf("Failed to create services request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch services: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch services: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event serviceEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch services unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - return - } - } -} - -// watchServiceEndpoints watches service endpoints as they come & go. -func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", endpointsURL, nil) - if err != nil { - log.Errorf("Failed to create service endpoints request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch service endpoints: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event endpointsEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch service endpoints unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } -} - -func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup { - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - - switch eventType { - case Deleted: - return kd.deleteService(service) - case Added, Modified: - return kd.addService(service) - } - return nil -} - -func (kd *Discovery) deleteService(service *Service) *config.TargetGroup { - tg := &config.TargetGroup{Source: serviceSource(service)} - - delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) - if len(kd.services[service.ObjectMeta.Namespace]) == 0 { - delete(kd.services, service.ObjectMeta.Namespace) - } - - return tg -} - -func (kd *Discovery) addService(service *Service) *config.TargetGroup { - namespace, ok := kd.services[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - kd.services[service.ObjectMeta.Namespace] = namespace - } - - namespace[service.ObjectMeta.Name] = service - endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) - - res, err := kd.queryAPIServerPath(endpointURL) - if err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to get service endpoints: %d", res.StatusCode) - return nil - } - - var eps Endpoints - if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - - return kd.updateServiceTargetGroup(service, &eps) -} - -func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: serviceSource(service), - Labels: model.LabelSet{ - serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), - serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), - }, - } - - for k, v := range service.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range service.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) - } - - serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" - - // Append the first TCP service port if one exists. - for _, port := range service.Spec.Ports { - if port.Protocol == ProtocolTCP { - serviceAddress += fmt.Sprintf(":%d", port.Port) - break - } - } - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(serviceAddress), - roleLabel: model.LabelValue("service"), - } - tg.Targets = append(tg.Targets, t) - - // Now let's loop through the endpoints & add them to the target group with appropriate labels. - for _, ss := range eps.Subsets { - epPort := ss.Ports[0].Port - - for _, addr := range ss.Addresses { - ipAddr := addr.IP - if len(ipAddr) == net.IPv6len { - ipAddr = "[" + ipAddr + "]" - } - address := fmt.Sprintf("%s:%d", ipAddr, epPort) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - roleLabel: model.LabelValue("endpoint"), - } - - tg.Targets = append(tg.Targets, t) - } - } - - return tg -} - -func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - - serviceNamespace := endpoints.ObjectMeta.Namespace - serviceName := endpoints.ObjectMeta.Name - - if service, ok := kd.services[serviceNamespace][serviceName]; ok { - return kd.updateServiceTargetGroup(service, endpoints) - } - return nil -} - func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) { bearerTokenFile := conf.BearerTokenFile caFile := conf.TLSConfig.CAFile @@ -622,10 +298,6 @@ func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, err }, nil } -func serviceSource(service *Service) string { - return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name -} - // Until loops until stop channel is closed, running f every period. // f may not be invoked if stop channel is already closed. func until(f func(), period time.Duration, stopCh <-chan struct{}) { diff --git a/retrieval/discovery/kubernetes/service.go b/retrieval/discovery/kubernetes/service.go new file mode 100644 index 0000000000..8bd7df8a8d --- /dev/null +++ b/retrieval/discovery/kubernetes/service.go @@ -0,0 +1,366 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type serviceDiscovery struct { + mtx sync.RWMutex + services map[string]map[string]*Service + retryInterval time.Duration + kd *Discovery +} + +func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + update := make(chan interface{}, 10) + go d.startServiceWatch(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case event := <-update: + switch e := event.(type) { + case *endpointsEvent: + log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name) + tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType)) + case *serviceEvent: + log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name) + tgs = append(tgs, d.updateService(e.Service, e.EventType)) + } + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } + +} + +func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) { + res, err := d.kd.queryAPIServerPath(servicesURL) + if err != nil { + // If we can't list services then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) + } + var services ServiceList + if err := json.NewDecoder(res.Body).Decode(&services); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) + } + + serviceMap := map[string]map[string]*Service{} + for idx, service := range services.Items { + namespace, ok := serviceMap[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + serviceMap[service.ObjectMeta.Namespace] = namespace + } + namespace[service.ObjectMeta.Name] = &services.Items[idx] + } + + return serviceMap, services.ResourceVersion, nil +} + +// watchServices watches services as they come & go. +func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted + // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. + d.mtx.Lock() + existingServices := d.services + + // Reset the known services. + d.services = map[string]map[string]*Service{} + d.mtx.Unlock() + + services, resourceVersion, err := d.getServices() + if err != nil { + log.Errorf("Cannot initialize services collection: %s", err) + return + } + + // Now let's loop through the old services & see if they still exist in here + for oldNSName, oldNS := range existingServices { + if ns, ok := services[oldNSName]; !ok { + for _, service := range existingServices[oldNSName] { + events <- &serviceEvent{Deleted, service} + } + } else { + for oldServiceName, oldService := range oldNS { + if _, ok := ns[oldServiceName]; !ok { + events <- &serviceEvent{Deleted, oldService} + } + } + } + } + + // Discard the existing services map for GC. + existingServices = nil + + for _, ns := range services { + for _, service := range ns { + events <- &serviceEvent{Added, service} + } + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + d.watchServices(resourceVersion, events, done) + wg.Done() + }() + go func() { + d.watchServiceEndpoints(resourceVersion, events, done) + wg.Done() + }() + + wg.Wait() + }, retryInterval, done) +} + +func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", servicesURL, nil) + if err != nil { + log.Errorf("Failed to create services request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch services: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch services: %d", res.StatusCode) + return + } + + dec := json.NewDecoder(res.Body) + + for { + var event serviceEvent + if err := dec.Decode(&event); err != nil { + log.Errorf("Watch services unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + return + } + } +} + +// watchServiceEndpoints watches service endpoints as they come & go. +func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", endpointsURL, nil) + if err != nil { + log.Errorf("Failed to create service endpoints request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch service endpoints: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) + return + } + + dec := json.NewDecoder(res.Body) + + for { + var event endpointsEvent + if err := dec.Decode(&event); err != nil { + log.Errorf("Watch service endpoints unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } +} + +func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup { + d.mtx.Lock() + defer d.mtx.Unlock() + + switch eventType { + case Deleted: + return d.deleteService(service) + case Added, Modified: + return d.addService(service) + } + return nil +} + +func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup { + tg := &config.TargetGroup{Source: serviceSource(service)} + + delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) + if len(d.services[service.ObjectMeta.Namespace]) == 0 { + delete(d.services, service.ObjectMeta.Namespace) + } + + return tg +} + +func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup { + namespace, ok := d.services[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + d.services[service.ObjectMeta.Namespace] = namespace + } + + namespace[service.ObjectMeta.Name] = service + endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) + + res, err := d.kd.queryAPIServerPath(endpointURL) + if err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return nil + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to get service endpoints: %d", res.StatusCode) + return nil + } + + var eps Endpoints + if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return nil + } + + return d.updateServiceTargetGroup(service, &eps) +} + +func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: serviceSource(service), + Labels: model.LabelSet{ + serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), + serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), + }, + } + + for k, v := range service.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) + tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range service.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + } + + serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" + + // Append the first TCP service port if one exists. + for _, port := range service.Spec.Ports { + if port.Protocol == ProtocolTCP { + serviceAddress += fmt.Sprintf(":%d", port.Port) + break + } + } + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(serviceAddress), + roleLabel: model.LabelValue("service"), + } + tg.Targets = append(tg.Targets, t) + + // Now let's loop through the endpoints & add them to the target group with appropriate labels. + for _, ss := range eps.Subsets { + epPort := ss.Ports[0].Port + + for _, addr := range ss.Addresses { + ipAddr := addr.IP + if len(ipAddr) == net.IPv6len { + ipAddr = "[" + ipAddr + "]" + } + address := fmt.Sprintf("%s:%d", ipAddr, epPort) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(address), + roleLabel: model.LabelValue("endpoint"), + } + + tg.Targets = append(tg.Targets, t) + } + } + + return tg +} + +func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { + d.mtx.Lock() + defer d.mtx.Unlock() + + serviceNamespace := endpoints.ObjectMeta.Namespace + serviceName := endpoints.ObjectMeta.Name + + if service, ok := d.services[serviceNamespace][serviceName]; ok { + return d.updateServiceTargetGroup(service, endpoints) + } + return nil +} + +func serviceSource(service *Service) string { + return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name +} From 7221228843bf0c969cd7357115adcebe24860219 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Jul 2016 19:28:29 +0200 Subject: [PATCH 5/7] discovery/kubernetes: select between discovery role This adds `role` field to the Kubernetes SD config, which indicates which type of Kubernetes SD should be run. This no longer allows discovering pods and nodes with the same SD configuration for example. --- config/config.go | 28 +++++++- config/config_test.go | 1 + config/testdata/conf.good.yml | 3 +- .../testdata/kubernetes_bearertoken.bad.yml | 3 +- .../kubernetes_bearertoken_basicauth.bad.yml | 3 +- retrieval/discovery/kubernetes/discovery.go | 65 +++++++------------ 6 files changed, 58 insertions(+), 45 deletions(-) diff --git a/config/config.go b/config/config.go index 36847d045f..079d546a6b 100644 --- a/config/config.go +++ b/config/config.go @@ -792,6 +792,7 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro // KubernetesSDConfig is the configuration for Kubernetes service discovery. type KubernetesSDConfig struct { APIServers []URL `yaml:"api_servers"` + Role string `yaml:"role"` InCluster bool `yaml:"in_cluster,omitempty"` BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` @@ -804,6 +805,29 @@ type KubernetesSDConfig struct { XXX map[string]interface{} `yaml:",inline"` } +type KubernetesRole string + +const ( + KubernetesRoleNode = "node" + KubernetesRolePod = "pod" + KubernetesRoleContainer = "container" + KubernetesRoleService = "service" + KubernetesRoleEndpoint = "endpoint" + KubernetesRoleAPIServer = "apiserver" +) + +func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := unmarshal((*string)(c)); err != nil { + return err + } + switch *c { + case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleContainer, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleAPIServer: + return nil + default: + return fmt.Errorf("Unknown Kubernetes SD role %q", c) + } +} + // UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultKubernetesSDConfig @@ -815,6 +839,9 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil { return err } + if c.Role == "" { + return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)") + } if len(c.APIServers) == 0 { return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server") } @@ -824,7 +851,6 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) { return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured") } - return nil } diff --git a/config/config_test.go b/config/config_test.go index a7555d8784..ca9ab86a78 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -223,6 +223,7 @@ var expectedConf = &Config{ KubernetesSDConfigs: []*KubernetesSDConfig{ { APIServers: []URL{kubernetesSDHostURL()}, + Role: KubernetesRoleEndpoint, BasicAuth: &BasicAuth{ Username: "myusername", Password: "mypassword", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index b911db7673..ea9acfbf1e 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -109,7 +109,8 @@ scrape_configs: - job_name: service-kubernetes kubernetes_sd_configs: - - api_servers: + - role: endpoint + api_servers: - 'https://localhost:1234' basic_auth: diff --git a/config/testdata/kubernetes_bearertoken.bad.yml b/config/testdata/kubernetes_bearertoken.bad.yml index 756644be6f..533742fd3f 100644 --- a/config/testdata/kubernetes_bearertoken.bad.yml +++ b/config/testdata/kubernetes_bearertoken.bad.yml @@ -2,7 +2,8 @@ scrape_configs: - job_name: prometheus kubernetes_sd_configs: - - api_servers: + - role: node + api_servers: - 'https://localhost:1234' bearer_token: 1234 diff --git a/config/testdata/kubernetes_bearertoken_basicauth.bad.yml b/config/testdata/kubernetes_bearertoken_basicauth.bad.yml index 9ef16ae4bd..e7c1633d5c 100644 --- a/config/testdata/kubernetes_bearertoken_basicauth.bad.yml +++ b/config/testdata/kubernetes_bearertoken_basicauth.bad.yml @@ -2,7 +2,8 @@ scrape_configs: - job_name: prometheus kubernetes_sd_configs: - - api_servers: + - role: pod + api_servers: - 'https://localhost:1234' bearer_token: 1234 diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index 1e2a6698b3..d06411d679 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -127,52 +127,35 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { log.Debugf("Kubernetes Discovery.Run beginning") defer close(ch) - var wg sync.WaitGroup - - pd := &podDiscovery{ - retryInterval: time.Duration(kd.Conf.RetryInterval), - kd: kd, - } - wg.Add(1) - go func() { + switch kd.Conf.Role { + case config.KubernetesRolePod, config.KubernetesRoleContainer: + pd := &podDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } pd.run(ctx, ch) - wg.Done() - }() - - nd := &nodeDiscovery{ - retryInterval: time.Duration(kd.Conf.RetryInterval), - kd: kd, - } - wg.Add(1) - go func() { + case config.KubernetesRoleNode: + nd := &nodeDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } nd.run(ctx, ch) - wg.Done() - }() - - sd := &serviceDiscovery{ - retryInterval: time.Duration(kd.Conf.RetryInterval), - kd: kd, - } - wg.Add(1) - go func() { + case config.KubernetesRoleService, config.KubernetesRoleEndpoint: + sd := &serviceDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } sd.run(ctx, ch) - wg.Done() - }() - - // Send an initial full view. - // TODO(fabxc): this does not include all available services and service - // endpoints yet. Service endpoints were also missing in the previous Sources() method. - var all []*config.TargetGroup - - all = append(all, kd.updateAPIServersTargetGroup()) - - select { - case ch <- all: - case <-ctx.Done(): + case config.KubernetesRoleAPIServer: + select { + case ch <- []*config.TargetGroup{kd.updateAPIServersTargetGroup()}: + case <-ctx.Done(): + return + } + default: + log.Errorf("unknown Kubernetes discovery kind %q", kd.Conf.Role) return } - - wg.Wait() } func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { From 0ff354341b5bb3d7a38d2b5f1ebe9a6342e6d9b7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 5 Jul 2016 10:13:24 +0200 Subject: [PATCH 6/7] discovery/kubernetes: remove unused channel --- retrieval/discovery/kubernetes/discovery.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index d06411d679..f05e2aa5e7 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -104,7 +104,6 @@ type Discovery struct { apiServers []config.URL apiServersMu sync.RWMutex - runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -117,14 +116,13 @@ func (kd *Discovery) Initialize() error { kd.apiServers = kd.Conf.APIServers kd.client = client - kd.runDone = make(chan struct{}) return nil } // Run implements the TargetProvider interface. func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - log.Debugf("Kubernetes Discovery.Run beginning") + log.Debugf("Start Kubernetes service discovery") defer close(ch) switch kd.Conf.Role { From 4591a2623b5726bfa0d09243490d56ae3f9824ef Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 5 Jul 2016 10:24:53 +0200 Subject: [PATCH 7/7] discovery/kubernetes: filter pod/container, service/endpoint This change distinguishes and filters by pod/container and service/endpoint in the respective sub-SDs. --- retrieval/discovery/kubernetes/pod.go | 28 ++++++++++----- retrieval/discovery/kubernetes/service.go | 44 ++++++++++++----------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go index 8296188f8e..1d9759d553 100644 --- a/retrieval/discovery/kubernetes/pod.go +++ b/retrieval/discovery/kubernetes/pod.go @@ -48,10 +48,15 @@ func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) } d.pods = pods - initial := []*config.TargetGroup{d.updatePodsTargetGroup()} - for _, ns := range d.pods { - for _, pod := range ns { - initial = append(initial, d.updatePodTargetGroup(pod)) + initial := []*config.TargetGroup{} + switch d.kd.Conf.Role { + case config.KubernetesRolePod: + initial = append(initial, d.updatePodsTargetGroup()) + case config.KubernetesRoleContainer: + for _, ns := range d.pods { + for _, pod := range ns { + initial = append(initial, d.updateContainerTargetGroup(pod)) + } } } @@ -71,11 +76,16 @@ func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) return case e := <-update: log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name) - // Update the per-pod target group d.updatePod(e.Pod, e.EventType) - tgs = append(tgs, d.updatePodTargetGroup(e.Pod)) - // ...and update the all pods target group - tgs = append(tgs, d.updatePodsTargetGroup()) + + switch d.kd.Conf.Role { + case config.KubernetesRoleContainer: + // Update the per-pod target group + tgs = append(tgs, d.updateContainerTargetGroup(e.Pod)) + case config.KubernetesRolePod: + // Update the all pods target group + tgs = append(tgs, d.updatePodsTargetGroup()) + } } if tgs == nil { continue @@ -188,7 +198,7 @@ func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) { } } -func (d *podDiscovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { +func (d *podDiscovery) updateContainerTargetGroup(pod *Pod) *config.TargetGroup { d.mtx.RLock() defer d.mtx.RUnlock() diff --git a/retrieval/discovery/kubernetes/service.go b/retrieval/discovery/kubernetes/service.go index 8bd7df8a8d..3c0c6d04b9 100644 --- a/retrieval/discovery/kubernetes/service.go +++ b/retrieval/discovery/kubernetes/service.go @@ -318,30 +318,34 @@ func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpo break } } + switch d.kd.Conf.Role { + case config.KubernetesRoleService: + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(serviceAddress), + roleLabel: model.LabelValue("service"), + } + tg.Targets = append(tg.Targets, t) - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(serviceAddress), - roleLabel: model.LabelValue("service"), - } - tg.Targets = append(tg.Targets, t) + case config.KubernetesRoleEndpoint: + // Now let's loop through the endpoints & add them to the target group + // with appropriate labels. + for _, ss := range eps.Subsets { + epPort := ss.Ports[0].Port - // Now let's loop through the endpoints & add them to the target group with appropriate labels. - for _, ss := range eps.Subsets { - epPort := ss.Ports[0].Port + for _, addr := range ss.Addresses { + ipAddr := addr.IP + if len(ipAddr) == net.IPv6len { + ipAddr = "[" + ipAddr + "]" + } + address := fmt.Sprintf("%s:%d", ipAddr, epPort) - for _, addr := range ss.Addresses { - ipAddr := addr.IP - if len(ipAddr) == net.IPv6len { - ipAddr = "[" + ipAddr + "]" + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(address), + roleLabel: model.LabelValue("endpoint"), + } + + tg.Targets = append(tg.Targets, t) } - address := fmt.Sprintf("%s:%d", ipAddr, epPort) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - roleLabel: model.LabelValue("endpoint"), - } - - tg.Targets = append(tg.Targets, t) } }