e2e: node: podresources: fix grpc connection issues when kubelet restarts

Signed-off-by: Swati Gupta <swatig@nvidia.com>
This commit is contained in:
Swati Gupta 2026-02-09 14:11:07 -08:00
parent a555870578
commit 80fdfe2b72

View file

@ -32,6 +32,7 @@ import (
"github.com/onsi/gomega/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
v1 "k8s.io/api/core/v1"
@ -963,8 +964,6 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel
tpd = newTestPodData()
ginkgo.By("checking the output when only pods which don't require resources are present")
expectListAndGetConsistent(ctx, cli, "pod-00", f.Namespace.Name)
expected = []podDesc{
{
podName: "pod-00",
@ -972,6 +971,7 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel
},
}
tpd.createPodsForTest(ctx, f, expected)
expectListAndGetConsistent(ctx, cli, "pod-00", f.Namespace.Name)
resp = podresourcesGetWithRetry(ctx, cli, f.Namespace.Name, "pod-00")
podResourceList = []*kubeletpodresourcesv1.PodResources{resp.GetPodResources()}
@ -983,8 +983,6 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel
tpd = newTestPodData()
ginkgo.By("checking the output when only pod require CPU")
expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name)
expected = []podDesc{
{
podName: "pod-01",
@ -993,6 +991,7 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel
},
}
tpd.createPodsForTest(ctx, f, expected)
expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name)
resp = podresourcesGetWithRetry(ctx, cli, f.Namespace.Name, "pod-01")
podResourceList = []*kubeletpodresourcesv1.PodResources{resp.GetPodResources()}
@ -1004,8 +1003,6 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel
tpd = newTestPodData()
ginkgo.By("checking the output when a pod has multiple containers and only one of them requires exclusive CPUs")
expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name)
expected = []podDesc{
{
podName: "pod-01",
@ -1028,6 +1025,7 @@ func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubel
},
}
tpd.createPodsForTest(ctx, f, expected)
expectListAndGetConsistent(ctx, cli, "pod-01", f.Namespace.Name)
resp = podresourcesGetWithRetry(ctx, cli, f.Namespace.Name, "pod-01")
podResourceList = []*kubeletpodresourcesv1.PodResources{resp.GetPodResources()}
@ -1236,13 +1234,6 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
})
ginkgo.It("should succeed when calling Get for a valid pod", func(ctx context.Context) {
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() faild err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
defer framework.ExpectNoError(conn.Close())
ginkgo.By("checking Get succeeds when the feature gate is enabled")
pd := podDesc{
podName: "fg-enabled-pod",
@ -1252,10 +1243,44 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
pod := makePodResourcesTestPod(pd)
pod = e2epod.NewPodClient(f).Create(ctx, pod)
defer e2epod.NewPodClient(f).DeleteSync(ctx, pod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
err = e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady)
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady)
framework.ExpectNoError(err)
res := podresourcesGetWithRetry(ctx, cli, pod.Namespace, pod.Name)
// Kubelet restarts when applying CPUManager static policy;
// podresources socket may not be immediately ready.
// Retry with a fresh connection.
waitForPodResourcesV1Serving(ctx)
var (
res *kubeletpodresourcesv1.GetPodResourcesResponse
lastErr error
)
// Once list is available, Get() should too.
// Retry time kept shorter.
gomega.Eventually(func() error {
lastErr = withPodResourcesV1Client(ctx, func(cli kubeletpodresourcesv1.PodResourcesListerClient) error {
reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
r, err := cli.Get(reqCtx, &kubeletpodresourcesv1.GetPodResourcesRequest{
PodName: pod.Name,
PodNamespace: pod.Namespace,
})
if err != nil {
return err
}
res = r
return nil
})
return lastErr
}).WithTimeout(15*time.Second).WithPolling(1*time.Second).Should(
gomega.Succeed(),
"Expected Get to succeed with the feature gate enabled (last err: %v)", lastErr,
)
framework.Logf("Get result: %v", res)
gomega.Expect(res).ToNot(gomega.BeNil(), "expected not nil Get response")
gomega.Expect(res.PodResources.Name).To(gomega.Equal(pod.Name))
gomega.Expect(res.PodResources.Containers).To(gomega.HaveLen(1), "expected one container")
container := res.PodResources.Containers[0]
@ -1266,10 +1291,32 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
})
})
ginkgo.It("should return the expected responses", func(ctx context.Context) {
onlineCPUs, err := getOnlineCPUs()
framework.ExpectNoError(err, "getOnlineCPUs() failed err: %v", err)
waitForPodResourcesV1Serving(ctx)
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
defer func() {
framework.ExpectNoError(conn.Close())
}()
podresourcesListTests(ctx, f, cli, nil, false)
podresourcesGetAllocatableResourcesTests(ctx, cli, nil, onlineCPUs, reservedSystemCPUs)
podresourcesGetTests(ctx, f, cli, false)
})
framework.It("should return the expected responses", framework.WithNodeConformance(), func(ctx context.Context) {
onlineCPUs, err := getOnlineCPUs()
framework.ExpectNoError(err, "getOnlineCPUs() failed err: %v", err)
waitForPodResourcesV1Serving(ctx)
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
@ -1605,7 +1652,7 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
cli, podresConn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
gomega.Consistently(func(ctx context.Context) error {
gomega.Eventually(func(ctx context.Context) error {
found, err := getPodResourcesValues(ctx, cli)
if err != nil {
return err
@ -1616,6 +1663,16 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
return nil
}).WithContext(ctx).WithTimeout(30 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed())
gomega.Consistently(func(ctx context.Context) error {
found, err := getPodResourcesValues(ctx, cli)
if err != nil {
return err
}
if len(found) > 0 {
return fmt.Errorf("returned unexpected pods: %v", found)
}
return nil
}).WithContext(ctx).WithTimeout(10 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed())
},
ginkgo.Entry("cpu and mem single", context.TODO(), 1000, 1),
ginkgo.Entry("cpu and mem multi", context.TODO(), 1000, 3),
@ -1848,7 +1905,11 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
cli, podresConn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
gomega.Consistently(func(ctx context.Context) error {
// After all pods terminate, PodResources should eventually stop reporting them.
// gomega.Consistently was used here, but CI showed transient failures like:
// "Failed after 0.001s.: received 1 pods, expected 0" immediately after termination due to lags
// Use Eventually to allow the system to converge before asserting emptiness.
gomega.Eventually(func(ctx context.Context) error {
found, err := getPodResourcesValues(ctx, cli)
if err != nil {
return err
@ -1858,6 +1919,15 @@ var _ = SIGDescribe("POD Resources API", framework.WithSerial(), feature.PodReso
// this is why we introduced the FeatureGate in the first place
return matchPodDescWithResourcesNamesOnly(expected, found)
}).WithContext(ctx).WithTimeout(30 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed())
// for more robust check, also do a Consistently check
gomega.Consistently(func(ctx context.Context) error {
found, err := getPodResourcesValues(ctx, cli)
if err != nil {
return err
}
return matchPodDescWithResourcesNamesOnly(expected, found)
}).WithContext(ctx).WithTimeout(10 * time.Second).WithPolling(2 * time.Second).Should(gomega.Succeed())
},
ginkgo.Entry("cpu and mem single", context.TODO(), 1000, 1),
ginkgo.Entry("cpu and mem multi", context.TODO(), 1000, 3),
@ -2141,6 +2211,7 @@ func getPodResourcesFromList(ctx context.Context, cli kubeletpodresourcesv1.PodR
if err != nil {
return nil, err
}
for _, pr := range resp.GetPodResources() {
if pr.GetName() == podName && pr.GetNamespace() == podNamespace {
return pr, nil
@ -2157,21 +2228,25 @@ func preparePodResourcesListVsGet(pr *kubeletpodresourcesv1.PodResources) *kubel
out := proto.Clone(pr).(*kubeletpodresourcesv1.PodResources)
// sort containers by name.
//nolint:modernize // keep sort.Slice for compatibility with supported Go versions
sort.Slice(out.Containers, func(i, j int) bool {
return out.Containers[i].GetName() < out.Containers[j].GetName()
})
for _, c := range out.Containers {
// sort CPU IDs.
//nolint:modernize // keep sort.Slice for compatibility with supported Go versions
sort.Slice(c.CpuIds, func(i, j int) bool {
return c.CpuIds[i] < c.CpuIds[j]
})
// sort devices by resource name, then sort device IDs.
//nolint:modernize // keep sort.Slice for compatibility with supported Go versions
sort.Slice(c.Devices, func(i, j int) bool {
return c.Devices[i].GetResourceName() < c.Devices[j].GetResourceName()
})
for _, d := range c.Devices {
//nolint:modernize // keep sort.Strings for compatibility with supported Go versions
sort.Strings(d.DeviceIds)
// Topology isn't part of the List/Get consistency check.
d.Topology = nil
@ -2197,31 +2272,83 @@ func comparePodResourcesListVsGet(listPR, getPR *kubeletpodresourcesv1.PodResour
lPR := preparePodResourcesListVsGet(listPR)
gPR := preparePodResourcesListVsGet(getPR)
if diff := cmp.Diff(lPR, gPR); diff != "" {
return fmt.Errorf("List() vs Get() PodResources mismatch (-list +get):\n%s", diff)
if proto.Equal(lPR, gPR) {
return nil
}
return nil
jsonOpts := protojson.MarshalOptions{UseProtoNames: true, EmitUnpopulated: true}
ljson, lerr := jsonOpts.Marshal(lPR)
gjson, gerr := jsonOpts.Marshal(gPR)
if lerr == nil && gerr == nil {
if diff := cmp.Diff(string(ljson), string(gjson)); diff != "" {
return fmt.Errorf("List() vs Get() PodResources mismatch (-list +get):\n%s", diff)
}
}
return fmt.Errorf("List() vs Get() PodResources mismatch")
}
func expectListAndGetConsistent(ctx context.Context, cli kubeletpodresourcesv1.PodResourcesListerClient, podName, podNamespace string) {
gomega.Eventually(func(ctx context.Context) error {
listPR, err := getPodResourcesFromList(ctx, cli, podName, podNamespace)
if err != nil {
return err
}
getResp, err := cli.Get(ctx, &kubeletpodresourcesv1.GetPodResourcesRequest{
PodName: podName,
PodNamespace: podNamespace,
// retry in case of kubelet/podresources restarts.
return withPodResourcesV1Client(ctx, func(c kubeletpodresourcesv1.PodResourcesListerClient) error {
listPR, err := getPodResourcesFromList(ctx, c, podName, podNamespace)
if err != nil {
return err
}
getResp, err := c.Get(ctx, &kubeletpodresourcesv1.GetPodResourcesRequest{
PodName: podName,
PodNamespace: podNamespace,
})
if err != nil {
return err
}
return comparePodResourcesListVsGet(listPR, getResp.GetPodResources())
})
if err != nil {
return err
}
return comparePodResourcesListVsGet(listPR, getResp.GetPodResources())
}).
WithContext(ctx).
WithPolling(5 * time.Second).
WithTimeout(1 * time.Minute).
Should(gomega.Succeed())
}
// withPodResourcesV1Client dials the kubelet podresources unix socket, executes fn, and closes the conn.
// intended to be used from Eventually() to avoid holding a stale connection across kubelet restarts.
func withPodResourcesV1Client(
ctx context.Context,
fn func(cli kubeletpodresourcesv1.PodResourcesListerClient) error,
) error {
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
if err != nil {
return err
}
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
return fn(cli)
}
// waitForPodResourcesV1Serving blocks until the PodResources v1 endpoint is serving requests.
// gRPC server may temporarily refuse connections even after pods become Ready,
// due to kubelet restarts.
// CI log: failed [FAILED] Expected Get to succeed with the feature gate enabled: rpc error: code = Canceled desc = grpc: the client connection is closing.
func waitForPodResourcesV1Serving(ctx context.Context) {
var lastErr error
gomega.Eventually(func() error {
lastErr = withPodResourcesV1Client(ctx, func(cli kubeletpodresourcesv1.PodResourcesListerClient) error {
reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := cli.List(reqCtx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
return err
})
return lastErr
}).WithTimeout(1*time.Minute).WithPolling(1*time.Second).Should(
gomega.Succeed(),
"PodResources endpoint did not become ready (last err: %v)", lastErr,
)
}