From ce45040e47e0bddd576f4d79e143e3354d9b5c23 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 17 Oct 2016 11:05:13 +0200 Subject: [PATCH] kubernetes: fix missing port labels This commit fixes endpoint port labeling, adjusts tests accordingly and enhances test delta printing --- .../kubernetes/{endpoint.go => endpoints.go} | 13 ++++-- .../{endpoint_test.go => endpoints_test.go} | 41 ++++++++++--------- retrieval/discovery/kubernetes/kubernetes.go | 2 +- retrieval/discovery/kubernetes/node_test.go | 30 ++++++-------- retrieval/discovery/kubernetes/pod.go | 6 +-- retrieval/discovery/kubernetes/pod_test.go | 2 +- 6 files changed, 49 insertions(+), 45 deletions(-) rename retrieval/discovery/kubernetes/{endpoint.go => endpoints.go} (92%) rename retrieval/discovery/kubernetes/{endpoint_test.go => endpoints_test.go} (88%) diff --git a/retrieval/discovery/kubernetes/endpoint.go b/retrieval/discovery/kubernetes/endpoints.go similarity index 92% rename from retrieval/discovery/kubernetes/endpoint.go rename to retrieval/discovery/kubernetes/endpoints.go index 3fac8ce71f..cb7a9e5519 100644 --- a/retrieval/discovery/kubernetes/endpoint.go +++ b/retrieval/discovery/kubernetes/endpoints.go @@ -106,6 +106,8 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } } e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // TODO(fabxc): potentially remove add and delete event handlers. Those should + // be triggered via the endpoint handlers already. AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, @@ -147,7 +149,7 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { seenPods := map[string]*podEntry{} add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) { - a := net.JoinHostPort(addr.IP, strconv.FormatInt(int64(port.Port), 10)) + a := net.JoinHostPort(addr.IP, strconv.FormatUint(uint64(port.Port), 10)) target := model.LabelSet{ model.AddressLabel: lv(a), @@ -177,8 +179,11 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { for _, c := range pod.Spec.Containers { for _, cport := range c.Ports { if port.Port == cport.ContainerPort { + ports := strconv.FormatUint(uint64(port.Port), 10) + target[podContainerNameLabel] = lv(c.Name) - target[podContainerPortNameLabel] = lv(port.Name) + target[podContainerPortNameLabel] = lv(cport.Name) + target[podContainerPortNumberLabel] = lv(ports) target[podContainerPortProtocolLabel] = lv(string(port.Protocol)) break } @@ -221,12 +226,14 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { continue } - a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatInt(int64(cport.ContainerPort), 10)) + a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10)) + ports := strconv.FormatUint(uint64(cport.ContainerPort), 10) target := model.LabelSet{ model.AddressLabel: lv(a), podContainerNameLabel: lv(c.Name), podContainerPortNameLabel: lv(cport.Name), + podContainerPortNumberLabel: lv(ports), podContainerPortProtocolLabel: lv(string(cport.Protocol)), } tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod))) diff --git a/retrieval/discovery/kubernetes/endpoint_test.go b/retrieval/discovery/kubernetes/endpoints_test.go similarity index 88% rename from retrieval/discovery/kubernetes/endpoint_test.go rename to retrieval/discovery/kubernetes/endpoints_test.go index b697b03cb0..8ea304a19f 100644 --- a/retrieval/discovery/kubernetes/endpoint_test.go +++ b/retrieval/discovery/kubernetes/endpoints_test.go @@ -14,7 +14,6 @@ package kubernetes import ( - //"fmt" "testing" "github.com/prometheus/common/log" @@ -23,22 +22,22 @@ import ( "k8s.io/client-go/1.5/pkg/api/v1" ) -func endpointStoreKeyFunc(obj interface{}) (string, error) { +func endpointsStoreKeyFunc(obj interface{}) (string, error) { return obj.(*v1.Endpoints).ObjectMeta.Name, nil } -func newFakeEndpointInformer() *fakeInformer { - return newFakeInformer(endpointStoreKeyFunc) +func newFakeEndpointsInformer() *fakeInformer { + return newFakeInformer(endpointsStoreKeyFunc) } -func makeTestEndpointDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer) { +func makeTestEndpointsDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer) { svc := newFakeServiceInformer() - eps := newFakeEndpointInformer() + eps := newFakeEndpointsInformer() pod := newFakePodInformer() return NewEndpoints(log.Base(), svc, eps, pod), svc, eps, pod } -func makeEndpoint() *v1.Endpoints { +func makeEndpoints() *v1.Endpoints { return &v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: "testendpoints", @@ -82,9 +81,9 @@ func makeEndpoint() *v1.Endpoints { } } -func TestEndpointDiscoveryInitial(t *testing.T) { - n, _, eps, _ := makeTestEndpointDiscovery() - eps.GetStore().Add(makeEndpoint()) +func TestEndpointsDiscoveryInitial(t *testing.T) { + n, _, eps, _ := makeTestEndpointsDiscovery() + eps.GetStore().Add(makeEndpoints()) k8sDiscoveryTest{ discovery: n, @@ -120,8 +119,8 @@ func TestEndpointDiscoveryInitial(t *testing.T) { }.Run(t) } -func TestEndpointDiscoveryAdd(t *testing.T) { - n, _, eps, pods := makeTestEndpointDiscovery() +func TestEndpointsDiscoveryAdd(t *testing.T) { + n, _, eps, pods := makeTestEndpointsDiscovery() pods.GetStore().Add(&v1.Pod{ ObjectMeta: v1.ObjectMeta{ Name: "testpod", @@ -207,7 +206,8 @@ func TestEndpointDiscoveryAdd(t *testing.T) { "__meta_kubernetes_pod_node_name": "testnode", "__meta_kubernetes_pod_host_ip": "2.3.4.5", "__meta_kubernetes_pod_container_name": "c1", - "__meta_kubernetes_pod_container_port_name": "testport", + "__meta_kubernetes_pod_container_port_name": "mainport", + "__meta_kubernetes_pod_container_port_number": "9000", "__meta_kubernetes_pod_container_port_protocol": "TCP", }, model.LabelSet{ @@ -219,6 +219,7 @@ func TestEndpointDiscoveryAdd(t *testing.T) { "__meta_kubernetes_pod_host_ip": "2.3.4.5", "__meta_kubernetes_pod_container_name": "c2", "__meta_kubernetes_pod_container_port_name": "sideport", + "__meta_kubernetes_pod_container_port_number": "9001", "__meta_kubernetes_pod_container_port_protocol": "TCP", }, }, @@ -232,13 +233,13 @@ func TestEndpointDiscoveryAdd(t *testing.T) { }.Run(t) } -func TestEndpointDiscoveryDelete(t *testing.T) { - n, _, eps, _ := makeTestEndpointDiscovery() - eps.GetStore().Add(makeEndpoint()) +func TestEndpointsDiscoveryDelete(t *testing.T) { + n, _, eps, _ := makeTestEndpointsDiscovery() + eps.GetStore().Add(makeEndpoints()) k8sDiscoveryTest{ discovery: n, - afterStart: func() { go func() { eps.Delete(makeEndpoint()) }() }, + afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() }, expectedRes: []*config.TargetGroup{ &config.TargetGroup{ Source: "endpoints/default/testendpoints", @@ -247,9 +248,9 @@ func TestEndpointDiscoveryDelete(t *testing.T) { }.Run(t) } -func TestEndpointDiscoveryUpdate(t *testing.T) { - n, _, eps, _ := makeTestEndpointDiscovery() - eps.GetStore().Add(makeEndpoint()) +func TestEndpointsDiscoveryUpdate(t *testing.T) { + n, _, eps, _ := makeTestEndpointsDiscovery() + eps.GetStore().Add(makeEndpoints()) k8sDiscoveryTest{ discovery: n, diff --git a/retrieval/discovery/kubernetes/kubernetes.go b/retrieval/discovery/kubernetes/kubernetes.go index 2a1f4b6872..9b9da790d4 100644 --- a/retrieval/discovery/kubernetes/kubernetes.go +++ b/retrieval/discovery/kubernetes/kubernetes.go @@ -137,7 +137,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case "pod": plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) - pod := NewPods( + pod := NewPod( k.logger.With("kubernetes_sd", "pod"), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) diff --git a/retrieval/discovery/kubernetes/node_test.go b/retrieval/discovery/kubernetes/node_test.go index 84dae2d04a..0bb69cec9e 100644 --- a/retrieval/discovery/kubernetes/node_test.go +++ b/retrieval/discovery/kubernetes/node_test.go @@ -14,8 +14,8 @@ package kubernetes import ( + "encoding/json" "fmt" - "reflect" "sync" "testing" "time" @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/tools/cache" @@ -120,33 +121,28 @@ func (d k8sDiscoveryTest) Run(t *testing.T) { initialRes := <-ch if d.expectedInitial != nil { - if !reflect.DeepEqual(d.expectedInitial, initialRes) { - printExpected(d.expectedInitial, initialRes) - t.Fatal("Initial result target group not generated as expected") - } + requireTargetGroups(t, d.expectedInitial, initialRes) } if d.afterStart != nil && d.expectedRes != nil { d.afterStart() res := <-ch - if !reflect.DeepEqual(d.expectedRes, res) { - printExpected(d.expectedRes, res) - t.Fatal("Result target group not generated as expected") - } + requireTargetGroups(t, d.expectedRes, res) } } -func printExpected(expected, res []*config.TargetGroup) { - fmt.Printf("\nExpected %d TargetGroups:\n\n", len(expected)) - for _, e := range expected { - fmt.Printf("%#v\n\n", e) +func requireTargetGroups(t *testing.T, expected, res []*config.TargetGroup) { + b1, err := json.Marshal(expected) + if err != nil { + panic(err) + } + b2, err := json.Marshal(res) + if err != nil { + panic(err) } - fmt.Printf("\nResult %d TargetGroups:\n\n%", len(res)) - for _, e := range res { - fmt.Printf("%#v\n\n", e) - } + require.JSONEq(t, string(b1), string(b2)) } func nodeStoreKeyFunc(obj interface{}) (string, error) { diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go index dfff0136e7..3d16ea8b9d 100644 --- a/retrieval/discovery/kubernetes/pod.go +++ b/retrieval/discovery/kubernetes/pod.go @@ -36,8 +36,8 @@ type Pod struct { logger log.Logger } -// NewPods creates a new pod discovery. -func NewPods(l log.Logger, pods cache.SharedInformer) *Pod { +// NewPod creates a new pod discovery. +func NewPod(l log.Logger, pods cache.SharedInformer) *Pod { return &Pod{ informer: pods, store: pods.GetStore(), @@ -147,7 +147,7 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { } // Otherwise create one target for each container/port combination. for _, port := range c.Ports { - ports := strconv.FormatInt(int64(port.ContainerPort), 10) + ports := strconv.FormatUint(uint64(port.ContainerPort), 10) addr := net.JoinHostPort(pod.Status.PodIP, ports) tg.Targets = append(tg.Targets, model.LabelSet{ diff --git a/retrieval/discovery/kubernetes/pod_test.go b/retrieval/discovery/kubernetes/pod_test.go index 7f9e0c7fea..8b3112e524 100644 --- a/retrieval/discovery/kubernetes/pod_test.go +++ b/retrieval/discovery/kubernetes/pod_test.go @@ -33,7 +33,7 @@ func newFakePodInformer() *fakeInformer { func makeTestPodDiscovery() (*Pod, *fakeInformer) { i := newFakePodInformer() - return NewPods(log.Base(), i), i + return NewPod(log.Base(), i), i } func makeMultiPortPod() *v1.Pod {