From 80fdfe2b72b8a0c5500515757d974c9350e7bd02 Mon Sep 17 00:00:00 2001 From: Swati Gupta Date: Mon, 9 Feb 2026 14:11:07 -0800 Subject: [PATCH] e2e: node: podresources: fix grpc connection issues when kubelet restarts Signed-off-by: Swati Gupta --- test/e2e_node/podresources_test.go | 193 ++++++++++++++++++++++++----- 1 file changed, 160 insertions(+), 33 deletions(-) diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index bc386dd7d01..45760961330 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -32,6 +32,7 @@ import ( "github.com/onsi/gomega/types" "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" v1 "k8s.io/api/core/v1" @@ -963,8 +964,6 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel tpd = newTestPodData() ginkgo.By("checking the output when only pods which don't require resources are present") - expectListAndGetConsistent(ctx, cli, "pod-00", f.Namespace.Name) - expected = []podDesc{ { podName: "pod-00", @@ -972,6 +971,7 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel }, } tpd.createPodsForTest(ctx, f, expected) + expectListAndGetConsistent(ctx, cli, "pod-00", f.Namespace.Name) resp = podresourcesGetWithRetry(ctx, cli, f.Namespace.Name, "pod-00") podResourceList = []*kubeletpodresourcesv1.PodResources{resp.GetPodResources()} @@ -983,8 +983,6 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel tpd = newTestPodData() ginkgo.By("checking the output when only pod require CPU") - expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name) - expected = []podDesc{ { podName: "pod-01", @@ -993,6 +991,7 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel }, } tpd.createPodsForTest(ctx, f, expected) + expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name) resp = podresourcesGetWithRetry(ctx, cli, f.Namespace.Name, "pod-01") podResourceList = []*kubeletpodresourcesv1.PodResources{resp.GetPodResources()} @@ -1004,8 +1003,6 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel tpd = newTestPodData() ginkgo.By("checking the output when a pod has multiple containers and only one of them requires exclusive CPUs") - expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name) - expected = []podDesc{ { podName: "pod-01", @@ -1028,6 +1025,7 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel }, } tpd.createPodsForTest(ctx, f, expected) + expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name) resp = podresourcesGetWithRetry(ctx, cli, f.Namespace.Name, "pod-01") podResourceList = []*kubeletpodresourcesv1.PodResources{resp.GetPodResources()} @@ -1236,13 +1234,6 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso }) ginkgo.It("should succeed when calling Get for a valid pod", func(ctx context.Context) { - endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) - framework.ExpectNoError(err, "LocalEndpoint() faild err: %v", err) - - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) - defer framework.ExpectNoError(conn.Close()) - ginkgo.By("checking Get succeeds when the feature gate is enabled") pd := podDesc{ podName: "fg-enabled-pod", @@ -1252,10 +1243,44 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso pod := makePodResourcesTestPod(pd) pod = e2epod.NewPodClient(f).Create(ctx, pod) defer e2epod.NewPodClient(f).DeleteSync(ctx, pod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) - err = e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) framework.ExpectNoError(err) - res := podresourcesGetWithRetry(ctx, cli, pod.Namespace, pod.Name) + // Kubelet restarts when applying CPUManager static policy; + // podresources socket may not be immediately ready. + // Retry with a fresh connection. + waitForPodResourcesV1Serving(ctx) + + var ( + res *kubeletpodresourcesv1.GetPodResourcesResponse + lastErr error + ) + + // Once list is available, Get() should too. + // Retry time kept shorter. + gomega.Eventually(func() error { + lastErr = withPodResourcesV1Client(ctx, func(cli kubeletpodresourcesv1.PodResourcesListerClient) error { + reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + r, err := cli.Get(reqCtx, &kubeletpodresourcesv1.GetPodResourcesRequest{ + PodName: pod.Name, + PodNamespace: pod.Namespace, + }) + if err != nil { + return err + } + res = r + return nil + }) + return lastErr + }).WithTimeout(15*time.Second).WithPolling(1*time.Second).Should( + gomega.Succeed(), + "Expected Get to succeed with the feature gate enabled (last err: %v)", lastErr, + ) + + framework.Logf("Get result: %v", res) + gomega.Expect(res).ToNot(gomega.BeNil(), "expected not nil Get response") gomega.Expect(res.PodResources.Name).To(gomega.Equal(pod.Name)) gomega.Expect(res.PodResources.Containers).To(gomega.HaveLen(1), "expected one container") container := res.PodResources.Containers[0] @@ -1266,10 +1291,32 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso }) }) + ginkgo.It("should return the expected responses", func(ctx context.Context) { + onlineCPUs, err := getOnlineCPUs() + framework.ExpectNoError(err, "getOnlineCPUs() failed err: %v", err) + + waitForPodResourcesV1Serving(ctx) + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + defer func() { + framework.ExpectNoError(conn.Close()) + }() + + podresourcesListTests(ctx, f, cli, nil, false) + podresourcesGetAllocatableResourcesTests(ctx, cli, nil, onlineCPUs, reservedSystemCPUs) + podresourcesGetTests(ctx, f, cli, false) + }) + framework.It("should return the expected responses", framework.WithNodeConformance(), func(ctx context.Context) { onlineCPUs, err := getOnlineCPUs() framework.ExpectNoError(err, "getOnlineCPUs() failed err: %v", err) + waitForPodResourcesV1Serving(ctx) + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) @@ -1605,7 +1652,7 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso cli, podresConn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - gomega.Consistently(func(ctx context.Context) error { + gomega.Eventually(func(ctx context.Context) error { found, err := getPodResourcesValues(ctx, cli) if err != nil { return err @@ -1616,6 +1663,16 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso return nil }).WithContext(ctx).WithTimeout(30 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed()) + gomega.Consistently(func(ctx context.Context) error { + found, err := getPodResourcesValues(ctx, cli) + if err != nil { + return err + } + if len(found) > 0 { + return fmt.Errorf("returned unexpected pods: %v", found) + } + return nil + }).WithContext(ctx).WithTimeout(10 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed()) }, ginkgo.Entry("cpu and mem single", context.TODO(), 1000, 1), ginkgo.Entry("cpu and mem multi", context.TODO(), 1000, 3), @@ -1848,7 +1905,11 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso cli, podresConn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - gomega.Consistently(func(ctx context.Context) error { + // After all pods terminate, PodResources should eventually stop reporting them. + // gomega.Consistently was used here, but CI showed transient failures like: + // "Failed after 0.001s.: received 1 pods, expected 0" immediately after termination due to lags + // Use Eventually to allow the system to converge before asserting emptiness. + gomega.Eventually(func(ctx context.Context) error { found, err := getPodResourcesValues(ctx, cli) if err != nil { return err @@ -1858,6 +1919,15 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso // this is why we introduced the FeatureGate in the first place return matchPodDescWithResourcesNamesOnly(expected, found) }).WithContext(ctx).WithTimeout(30 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed()) + // for more robust check, also do a Consistently check + gomega.Consistently(func(ctx context.Context) error { + found, err := getPodResourcesValues(ctx, cli) + if err != nil { + return err + } + return matchPodDescWithResourcesNamesOnly(expected, found) + }).WithContext(ctx).WithTimeout(10 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed()) + }, ginkgo.Entry("cpu and mem single", context.TODO(), 1000, 1), ginkgo.Entry("cpu and mem multi", context.TODO(), 1000, 3), @@ -2141,6 +2211,7 @@ func getPodResourcesFromList(ctx context.Context, cli kubeletpodresourcesv1.PodR if err != nil { return nil, err } + for _, pr := range resp.GetPodResources() { if pr.GetName() == podName && pr.GetNamespace() == podNamespace { return pr, nil @@ -2157,21 +2228,25 @@ func preparePodResourcesListVsGet(pr *kubeletpodresourcesv1.PodResources) *kubel out := proto.Clone(pr).(*kubeletpodresourcesv1.PodResources) // sort containers by name. + //nolint:modernize // keep sort.Slice for compatibility with supported Go versions sort.Slice(out.Containers, func(i, j int) bool { return out.Containers[i].GetName() < out.Containers[j].GetName() }) for _, c := range out.Containers { // sort CPU IDs. + //nolint:modernize // keep sort.Slice for compatibility with supported Go versions sort.Slice(c.CpuIds, func(i, j int) bool { return c.CpuIds[i] < c.CpuIds[j] }) // sort devices by resource name, then sort device IDs. + //nolint:modernize // keep sort.Slice for compatibility with supported Go versions sort.Slice(c.Devices, func(i, j int) bool { return c.Devices[i].GetResourceName() < c.Devices[j].GetResourceName() }) for _, d := range c.Devices { + //nolint:modernize // keep sort.Strings for compatibility with supported Go versions sort.Strings(d.DeviceIds) // Topology isn't part of the List/Get consistency check. d.Topology = nil @@ -2197,31 +2272,83 @@ func comparePodResourcesListVsGet(listPR, getPR *kubeletpodresourcesv1.PodResour lPR := preparePodResourcesListVsGet(listPR) gPR := preparePodResourcesListVsGet(getPR) - if diff := cmp.Diff(lPR, gPR); diff != "" { - return fmt.Errorf("List() vs Get() PodResources mismatch (-list +get):\n%s", diff) + if proto.Equal(lPR, gPR) { + return nil } - return nil + + jsonOpts := protojson.MarshalOptions{UseProtoNames: true, EmitUnpopulated: true} + ljson, lerr := jsonOpts.Marshal(lPR) + gjson, gerr := jsonOpts.Marshal(gPR) + if lerr == nil && gerr == nil { + if diff := cmp.Diff(string(ljson), string(gjson)); diff != "" { + return fmt.Errorf("List() vs Get() PodResources mismatch (-list +get):\n%s", diff) + } + } + + return fmt.Errorf("List() vs Get() PodResources mismatch") } func expectListAndGetConsistent(ctx context.Context, cli kubeletpodresourcesv1.PodResourcesListerClient, podName, podNamespace string) { gomega.Eventually(func(ctx context.Context) error { - listPR, err := getPodResourcesFromList(ctx, cli, podName, podNamespace) - if err != nil { - return err - } - - getResp, err := cli.Get(ctx, &kubeletpodresourcesv1.GetPodResourcesRequest{ - PodName: podName, - PodNamespace: podNamespace, + // retry in case of kubelet/podresources restarts. + return withPodResourcesV1Client(ctx, func(c kubeletpodresourcesv1.PodResourcesListerClient) error { + listPR, err := getPodResourcesFromList(ctx, c, podName, podNamespace) + if err != nil { + return err + } + getResp, err := c.Get(ctx, &kubeletpodresourcesv1.GetPodResourcesRequest{ + PodName: podName, + PodNamespace: podNamespace, + }) + if err != nil { + return err + } + return comparePodResourcesListVsGet(listPR, getResp.GetPodResources()) }) - if err != nil { - return err - } - - return comparePodResourcesListVsGet(listPR, getResp.GetPodResources()) }). WithContext(ctx). WithPolling(5 * time.Second). WithTimeout(1 * time.Minute). Should(gomega.Succeed()) } + +// withPodResourcesV1Client dials the kubelet podresources unix socket, executes fn, and closes the conn. +// intended to be used from Eventually() to avoid holding a stale connection across kubelet restarts. +func withPodResourcesV1Client( + ctx context.Context, + fn func(cli kubeletpodresourcesv1.PodResourcesListerClient) error, +) error { + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + if err != nil { + return err + } + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + if err != nil { + return err + } + + defer func() { _ = conn.Close() }() + + return fn(cli) +} + +// waitForPodResourcesV1Serving blocks until the PodResources v1 endpoint is serving requests. +// gRPC server may temporarily refuse connections even after pods become Ready, +// due to kubelet restarts. +// CI log: failed [FAILED] Expected Get to succeed with the feature gate enabled: rpc error: code = Canceled desc = grpc: the client connection is closing. +func waitForPodResourcesV1Serving(ctx context.Context) { + var lastErr error + gomega.Eventually(func() error { + lastErr = withPodResourcesV1Client(ctx, func(cli kubeletpodresourcesv1.PodResourcesListerClient) error { + reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := cli.List(reqCtx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) + return err + }) + return lastErr + }).WithTimeout(1*time.Minute).WithPolling(1*time.Second).Should( + gomega.Succeed(), + "PodResources endpoint did not become ready (last err: %v)", lastErr, + ) +}