From 0e3a35f9abccd4c0cc42e717866392920b9cafdd Mon Sep 17 00:00:00 2001 From: Xinyun Liu Date: Tue, 25 Nov 2025 04:03:10 +0000 Subject: [PATCH 1/4] DRA-like fix for device-plugin --- pkg/kubelet/cm/devicemanager/endpoint.go | 7 ++++ pkg/kubelet/cm/devicemanager/endpoint_test.go | 10 ++--- pkg/kubelet/cm/devicemanager/manager.go | 38 +++++++++++++++---- pkg/kubelet/cm/devicemanager/manager_test.go | 16 ++++++-- .../cm/devicemanager/plugin/v1beta1/api.go | 2 +- .../cm/devicemanager/plugin/v1beta1/client.go | 3 +- .../devicemanager/plugin/v1beta1/handler.go | 37 ++++++++++++------ .../cm/devicemanager/plugin/v1beta1/server.go | 14 ++++--- 8 files changed, 93 insertions(+), 34 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index f9f0f2845be..c0b65ed69a3 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -36,11 +36,13 @@ type endpoint interface { setStopTime(t time.Time) isStopped() bool stopGracePeriodExpired() bool + socketPath() string } type endpointImpl struct { mutex sync.Mutex resourceName string + socket string api pluginapi.DevicePluginClient stopTime time.Time client plugin.Client // for testing only @@ -52,6 +54,7 @@ func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl { return &endpointImpl{ api: p.API(), resourceName: p.Resource(), + socket: p.SocketPath(), } } @@ -64,6 +67,10 @@ func newStoppedEndpointImpl(resourceName string) *endpointImpl { } } +func (e *endpointImpl) socketPath() string { + return e.socket +} + func (e *endpointImpl) isStopped() bool { e.mutex.Lock() defer e.mutex.Unlock() diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index 726dd7f4342..ecc52da3715 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -43,7 +43,7 @@ func newMockPluginManager() *mockPluginManager { return &mockPluginManager{ func(string) error { return nil }, func(string, plugin.DevicePlugin) error { return nil }, - func(klog.Logger, string) {}, + func(klog.Logger, string, string) {}, func(string, *pluginapi.ListAndWatchResponse) {}, } } @@ -51,7 +51,7 @@ func newMockPluginManager() *mockPluginManager { type mockPluginManager struct { cleanupPluginDirectory func(string) error pluginConnected func(string, plugin.DevicePlugin) error - pluginDisconnected func(klog.Logger, string) + pluginDisconnected func(klog.Logger, string, string) pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse) } @@ -63,8 +63,8 @@ func (m *mockPluginManager) PluginConnected(_ context.Context, r string, p plugi return m.pluginConnected(r, p) } -func (m *mockPluginManager) PluginDisconnected(logger klog.Logger, r string) { - m.pluginDisconnected(logger, r) +func (m *mockPluginManager) PluginDisconnected(logger klog.Logger, r string, s string) { + m.pluginDisconnected(logger, r, s) } func (m *mockPluginManager) PluginListAndWatchReceiver(_ klog.Logger, r string, lr *pluginapi.ListAndWatchResponse) { @@ -288,7 +288,7 @@ func esetup(ctx context.Context, t *testing.T, devs []*pluginapi.Device, socket, e := newEndpointImpl(dp) e.client = c - m.pluginDisconnected = func(logger klog.Logger, r string) { + m.pluginDisconnected = func(logger klog.Logger, r string, s string) { e.setStopTime(time.Now()) } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index cd796773a2c..bf2f73a9d74 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "sort" "sync" "time" @@ -62,6 +63,8 @@ type ActivePodsFunc func() []*v1.Pod type ManagerImpl struct { checkpointdir string + endpointStore map[string][]*endpointInfo // Key is ResourceName + endpoints map[string]endpointInfo // Key is ResourceName mutex sync.Mutex @@ -149,8 +152,8 @@ func newManagerImpl(logger klog.Logger, socketPath string, topology []cadvisorap } manager := &ManagerImpl{ - endpoints: make(map[string]endpointInfo), - + endpoints: make(map[string]endpointInfo), + endpointStore: make(map[string][]*endpointInfo), allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.Set[string]), unhealthyDevices: make(map[string]sets.Set[string]), @@ -236,7 +239,16 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string, m.mutex.Lock() defer m.mutex.Unlock() + if m.endpointStore == nil { + m.endpointStore = make(map[string][]*endpointInfo) + } + for _, oldEndpoint := range m.endpointStore[resourceName] { + if oldEndpoint.e.socketPath() == e.socketPath() { + return fmt.Errorf("device plugin already connected: %s", e.socketPath()) + } + } m.endpoints[resourceName] = endpointInfo{e, options} + m.endpointStore[resourceName] = append(m.endpointStore[resourceName], &endpointInfo{e, options}) logger.V(2).Info("Device plugin connected", "resourceName", resourceName) return nil @@ -244,15 +256,27 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string, // PluginDisconnected is to disconnect a plugin from an endpoint. // This is done as part of device plugin deregistration. -func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string) { +func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string, socketPath string) { m.mutex.Lock() defer m.mutex.Unlock() - if ep, exists := m.endpoints[resourceName]; exists { + endpoints := m.endpointStore[resourceName] + i := slices.IndexFunc(endpoints, func(ep *endpointInfo) bool { + return ep.e.socketPath() == socketPath + }) + if i == -1 { + return + } + m.endpoints[resourceName].e.setStopTime(time.Now()) + last := len(endpoints) == 1 + if last { + // delete(m.endpoints, resourceName) + delete(m.endpointStore, resourceName) m.markResourceUnhealthy(logger, resourceName) - logger.V(2).Info("Endpoint became unhealthy", "resourceName", resourceName) - - ep.e.setStopTime(time.Now()) + logger.V(2).Info("Last DP for this resource disconnected", "resource", resourceName) + } else { + m.endpointStore[resourceName] = slices.Delete(endpoints, i, i+1) + m.endpoints[resourceName] = *m.endpointStore[resourceName][len(m.endpointStore[resourceName])-1] } } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 38add8678e7..096f372bcad 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -432,9 +432,11 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Tests adding another resource. resourceName2 := "resource2" - e2 := &endpointImpl{} + e2 := &endpointImpl{socket: socketName} e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager) - testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} + eInfo := endpointInfo{e: e2, opts: nil} + testManager.endpoints[resourceName2] = eInfo + testManager.endpointStore[resourceName2] = []*endpointInfo{&eInfo} callback(logger, resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() as.Len(capacity, 2) @@ -808,6 +810,7 @@ type MockEndpoint struct { getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error) initChan chan []string + socket string } func (m *MockEndpoint) preStartContainer(_ context.Context, devs []string) (*pluginapi.PreStartContainerResponse, error) { @@ -835,6 +838,10 @@ func (m *MockEndpoint) isStopped() bool { return false } func (m *MockEndpoint) stopGracePeriodExpired() bool { return false } +func (m *MockEndpoint) socketPath() string { + return m.socket +} + func makePod(limits v1.ResourceList) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -2160,9 +2167,12 @@ func TestEndpointSyncOnDisconnect(t *testing.T) { resourceName: resourceName, client: plugin.NewPluginClient(resourceName, socketName, manager), stopTime: time.Now().Add(-endpointStopGracePeriod * 2), // make the grace period expired + socket: socketName, } - manager.endpoints[resourceName] = endpointInfo{e: ep, opts: nil} + eInfo := endpointInfo{e: ep, opts: nil} + manager.endpoints[resourceName] = eInfo + manager.endpointStore[resourceName] = []*endpointInfo{&eInfo} devs := []*pluginapi.Device{ {ID: "Device1", Health: pluginapi.Healthy}, {ID: "Device2", Health: pluginapi.Healthy}, diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go index b400a212564..122796138dc 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go @@ -32,7 +32,7 @@ type RegistrationHandler interface { // ClientHandler is an interface for handling device plugin connections. type ClientHandler interface { PluginConnected(context.Context, string, DevicePlugin) error - PluginDisconnected(klog.Logger, string) + PluginDisconnected(klog.Logger, string, string) PluginListAndWatchReceiver(klog.Logger, string, *api.ListAndWatchResponse) } diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go index a6e0e79c10f..9655086f6ca 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go @@ -42,6 +42,7 @@ type Client interface { Connect(context.Context) error Run(context.Context) Disconnect(klog.Logger) error + SocketPath() string } type client struct { @@ -109,7 +110,7 @@ func (c *client) Disconnect(logger klog.Logger) error { c.grpc = nil } c.mutex.Unlock() - c.handler.PluginDisconnected(logger, c.resource) + c.handler.PluginDisconnected(logger, c.resource, c.socket) logger.V(2).Info("Device plugin disconnected", "resource", c.resource) return nil diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index 472399ffe6d..3ca40bb7202 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "slices" "time" core "k8s.io/api/core/v1" @@ -54,10 +55,11 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s func (s *server) DeRegisterPlugin(pluginName, endpoint string) { logger := klog.FromContext(context.TODO()) logger.V(2).Info("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint) - client := s.getClient(pluginName) + // endpoint in DeRegisterPlugin is the socket path + client := s.getClient(pluginName, endpoint) if client != nil { if err := s.disconnectClient(logger, pluginName, client); err != nil { - logger.Error(err, "disconnecting client", "plugin", pluginName, "endpoing", endpoint) + logger.Error(err, "disconnecting client", "plugin", pluginName, "endpoint", endpoint) } } } @@ -86,7 +88,7 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri s.registerClient(logger, name, c) if err := c.Connect(ctx); err != nil { - s.deregisterClient(logger, name) + s.deregisterClient(logger, name, socketPath) logger.Error(err, "Failed to connect to new client", "resource", name) return err } @@ -100,30 +102,38 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri } func (s *server) disconnectClient(logger klog.Logger, name string, c Client) error { - s.deregisterClient(logger, name) + s.deregisterClient(logger, name, c.SocketPath()) return c.Disconnect(logger) } func (s *server) registerClient(logger klog.Logger, name string, c Client) { s.mutex.Lock() defer s.mutex.Unlock() - s.clients[name] = c + s.clients[name] = append(s.clients[name], c) logger.V(2).Info("Registered client", "name", name) } -func (s *server) deregisterClient(logger klog.Logger, name string) { +func (s *server) deregisterClient(logger klog.Logger, name string, socketPath string) { s.mutex.Lock() defer s.mutex.Unlock() - delete(s.clients, name) - logger.V(2).Info("Deregistered client", "name", name) + for i, c := range s.clients[name] { + if c.SocketPath() == socketPath { + s.clients[name] = slices.Delete(s.clients[name], i, i+1) + if len(s.clients[name]) == 0 { + delete(s.clients, name) + } + logger.V(2).Info("Deregistered client", "name", name, "socketPath", socketPath) + break + } + } } func (s *server) runClient(ctx context.Context, name string, c Client) { logger := klog.FromContext(ctx) c.Run(ctx) - c = s.getClient(name) + c = s.getClient(name, c.SocketPath()) if c == nil { return } @@ -133,8 +143,13 @@ func (s *server) runClient(ctx context.Context, name string, c Client) { } } -func (s *server) getClient(name string) Client { +func (s *server) getClient(name string, socketPath string) Client { s.mutex.Lock() defer s.mutex.Unlock() - return s.clients[name] + for _, c := range s.clients[name] { + if c.SocketPath() == socketPath { + return c + } + } + return nil } diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go index 320ee4d80ea..6372469c1fb 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go @@ -55,7 +55,7 @@ type server struct { grpc *grpc.Server rhandler RegistrationHandler chandler ClientHandler - clients map[string]Client + clients map[string][]Client // lastError records the last runtime error. A server is considered healthy till an actual error occurs. lastError error @@ -77,7 +77,7 @@ func NewServer(logger klog.Logger, socketPath string, rh RegistrationHandler, ch socketDir: dir, rhandler: rh, chandler: ch, - clients: make(map[string]Client), + clients: make(map[string][]Client), } return s, nil @@ -198,10 +198,12 @@ func (s *server) isVersionCompatibleWithPlugin(versions ...string) bool { func (s *server) visitClients(visit func(r string, c Client)) { s.mutex.Lock() - for r, c := range s.clients { - s.mutex.Unlock() - visit(r, c) - s.mutex.Lock() + for r, cs := range s.clients { + for _, c := range cs { + s.mutex.Unlock() + visit(r, c) + s.mutex.Lock() + } } s.mutex.Unlock() } From 9d80dc279b4ac189c9e182c316ba41bcfc2328e5 Mon Sep 17 00:00:00 2001 From: Xinyun Liu Date: Tue, 10 Feb 2026 07:06:01 +0000 Subject: [PATCH 2/4] E2E test for dra-like fix for device plugin --- pkg/kubelet/cm/devicemanager/manager.go | 1 - test/e2e_node/device_plugin_test.go | 82 ++++++++++++++++++++----- test/e2e_node/podresources_test.go | 2 +- 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index bf2f73a9d74..24bc5d18ff4 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -270,7 +270,6 @@ func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string m.endpoints[resourceName].e.setStopTime(time.Now()) last := len(endpoints) == 1 if last { - // delete(m.endpoints, resourceName) delete(m.endpointStore, resourceName) m.markResourceUnhealthy(logger, resourceName) logger.V(2).Info("Last DP for this resource disconnected", "resource", resourceName) diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index 964898fb9df..c027669a82c 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -130,7 +130,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { } f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { - var devicePluginPod, dptemplate *v1.Pod + var devicePluginPod, devicePluginPod2, dptemplate, dptemplate2 *v1.Pod var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse var err error @@ -170,9 +170,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) ginkgo.By("Scheduling a sample device plugin pod") - dp := getSampleDevicePluginPod(pluginSockDir) + dp := getSampleDevicePluginPod(pluginSockDir, "dp1") + dp2 := getSampleDevicePluginPod(pluginSockDir, "dp2") dptemplate = dp.DeepCopy() + dptemplate2 = dp2.DeepCopy() devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) ginkgo.By("Waiting for devices to become available on the local node") gomega.Eventually(ctx, func(ctx context.Context) bool { @@ -193,6 +196,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.AfterEach(func(ctx context.Context) { ginkgo.By("Deleting the device plugin pod") e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) ginkgo.By("Deleting any Pods created by the test") l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) @@ -236,8 +240,8 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.Logf("len(v1alphaPodResources.PodResources):%+v", len(v1alphaPodResources.PodResources)) framework.Logf("len(v1PodResources.PodResources):%+v", len(v1PodResources.PodResources)) - gomega.Expect(v1alphaPodResources.PodResources).To(gomega.HaveLen(2)) - gomega.Expect(v1PodResources.PodResources).To(gomega.HaveLen(2)) + gomega.Expect(v1alphaPodResources.PodResources).To(gomega.HaveLen(3)) + gomega.Expect(v1PodResources.PodResources).To(gomega.HaveLen(3)) var v1alphaResourcesForOurPod *kubeletpodresourcesv1alpha1.PodResources for _, res := range v1alphaPodResources.GetPodResources() { @@ -342,6 +346,29 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) + ginkgo.It("Keeps device plugin assignments after one of the device plugins is deleted", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + ginkgo.By("Deleting the first device plugin") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + + ginkgo.By("Verifying the device assignment is preserved after deleting one device plugin") + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + v1PodResources, err = getV1NodeDevices(ctx) + framework.ExpectNoError(err) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after deleting second device plugin") + }) + // simulate kubelet restart. A compliant device plugin is expected to re-register, while the pod and the container stays running. // The flow with buggy or slow device plugin is deferred to another test. // The device assignment should be kept and be stable across the kubelet restart, because it's the kubelet which performs the device allocation, @@ -487,6 +514,8 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { gomega.Expect(e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)).To(gomega.Succeed()) ginkgo.By("Deleting the device plugin") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) @@ -536,11 +565,16 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { } e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, f.Timeouts.PodDelete) waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, deleteOptions, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) ginkgo.By("Recreating the plugin pod") - devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) - err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) - framework.ExpectNoError(err) + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) + err1 := e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod2.Name, devicePluginPod2.Namespace, 1*time.Minute) + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate2) + err2 := e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) + framework.ExpectNoError(err1) + framework.ExpectNoError(err2) ginkgo.By("Waiting for resource to become available on the local node after re-registration") gomega.Eventually(ctx, func() bool { @@ -613,9 +647,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { } e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, f.Timeouts.PodDelete) waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, deleteOptions, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) ginkgo.By("Recreating the plugin pod") devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate2) ginkgo.By("Waiting for resource to become available on the local node after restart") gomega.Eventually(ctx, func() bool { @@ -755,7 +792,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.Logf("PodResources.PodResources:%+v\n", podResources.PodResources) framework.Logf("len(PodResources.PodResources):%+v", len(podResources.PodResources)) - gomega.Expect(podResources.PodResources).To(gomega.HaveLen(2)) + gomega.Expect(podResources.PodResources).To(gomega.HaveLen(3)) var resourcesForOurPod *kubeletpodresourcesv1.PodResources for _, res := range podResources.GetPodResources() { @@ -790,7 +827,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { - var devicePluginPod *v1.Pod + var devicePluginPod, devicePluginPod2 *v1.Pod var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse var triggerPathFile, triggerPathDir string var err error @@ -863,14 +900,27 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { Spec: ds.Spec.Template.Spec, } + dp2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: SampleDevicePluginName + "-2", + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) framework.Logf("Waiting for device plugin pod to be Running") - err = e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPod.Namespace, devicePluginPod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) - if err != nil { - framework.Logf("Sample Device Pod %v took too long to enter running/ready: %v", dp.Name, err) + err1 := e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPod.Namespace, devicePluginPod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) + err2 := e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPod2.Namespace, devicePluginPod2.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) + if err1 != nil { + framework.Logf("Sample Device Pod %v took too long to enter running/ready: %v", dp.Name, err1) } - framework.ExpectNoError(err, "WaitForPodCondition() failed err: %v", err) + if err2 != nil { + framework.Logf("Sample Device Pod %v took too long to enter running/ready: %v", dp2.Name, err2) + } + framework.ExpectNoError(err1, "WaitForPodCondition() failed err: %v", err1) + framework.ExpectNoError(err2, "WaitForPodCondition() failed err: %v", err2) go func() { // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE @@ -904,6 +954,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { ginkgo.AfterEach(func(ctx context.Context) { ginkgo.By("Deleting the device plugin pod") e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) ginkgo.By("Deleting any Pods created by the test") l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) @@ -1111,7 +1162,7 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta } // getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests. -func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { +func getSampleDevicePluginPod(pluginSockDir string, uniqueName string) *v1.Pod { data, err := e2etestfiles.Read(e2enode.SampleDevicePluginDSYAML) if err != nil { framework.Fail(err.Error()) @@ -1120,7 +1171,7 @@ func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { ds := readDaemonSetV1OrDie(data) dp := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: e2enode.SampleDevicePluginName, + Name: e2enode.SampleDevicePluginName + "-" + uniqueName, }, Spec: ds.Spec.Template.Spec, } @@ -1131,6 +1182,7 @@ func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { } dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "CDI_ENABLED", Value: "1"}) + dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "UNIQUE_NAME", Value: uniqueName}) return dp } diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index fb26493b8bb..c9ce519d5e7 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -2118,7 +2118,7 @@ func getOnlineCPUs() (cpuset.CPUSet, error) { func setupSampleDevicePluginOrFail(ctx context.Context, f *framework.Framework) *v1.Pod { e2enode.WaitForNodeToBeReady(ctx, f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) - dp := getSampleDevicePluginPod(kubeletdevicepluginv1beta1.DevicePluginPath) + dp := getSampleDevicePluginPod(kubeletdevicepluginv1beta1.DevicePluginPath, "dp") dp.Spec.NodeName = framework.TestContext.NodeName ginkgo.By("Create the sample device plugin pod") From 990b72c522e85280eef739939be90b360d1785b5 Mon Sep 17 00:00:00 2001 From: Xinyun Liu Date: Mon, 23 Mar 2026 23:05:30 +0000 Subject: [PATCH 3/4] Address comments and add more e2e tests --- pkg/kubelet/cm/devicemanager/manager.go | 40 ++++--- pkg/kubelet/cm/devicemanager/manager_test.go | 4 +- .../devicemanager/plugin/v1beta1/handler.go | 23 ++-- test/e2e_node/device_plugin_test.go | 103 +++++++++++++++++- 4 files changed, 139 insertions(+), 31 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 24bc5d18ff4..a17a69c6bfe 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -23,7 +23,6 @@ import ( "os" "path/filepath" "runtime" - "slices" "sort" "sync" "time" @@ -63,7 +62,7 @@ type ActivePodsFunc func() []*v1.Pod type ManagerImpl struct { checkpointdir string - endpointStore map[string][]*endpointInfo // Key is ResourceName + endpointStore map[string]map[string]*endpointInfo // resourceName -> socketPath -> endpointInfo endpoints map[string]endpointInfo // Key is ResourceName mutex sync.Mutex @@ -153,7 +152,7 @@ func newManagerImpl(logger klog.Logger, socketPath string, topology []cadvisorap manager := &ManagerImpl{ endpoints: make(map[string]endpointInfo), - endpointStore: make(map[string][]*endpointInfo), + endpointStore: make(map[string]map[string]*endpointInfo), allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.Set[string]), unhealthyDevices: make(map[string]sets.Set[string]), @@ -240,15 +239,16 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string, m.mutex.Lock() defer m.mutex.Unlock() if m.endpointStore == nil { - m.endpointStore = make(map[string][]*endpointInfo) + m.endpointStore = make(map[string]map[string]*endpointInfo) } - for _, oldEndpoint := range m.endpointStore[resourceName] { - if oldEndpoint.e.socketPath() == e.socketPath() { - return fmt.Errorf("device plugin already connected: %s", e.socketPath()) - } + if m.endpointStore[resourceName] == nil { + m.endpointStore[resourceName] = make(map[string]*endpointInfo) + } + if _, exists := m.endpointStore[resourceName][e.socketPath()]; exists { + return fmt.Errorf("device plugin already connected: %s", e.socketPath()) } m.endpoints[resourceName] = endpointInfo{e, options} - m.endpointStore[resourceName] = append(m.endpointStore[resourceName], &endpointInfo{e, options}) + m.endpointStore[resourceName][e.socketPath()] = &endpointInfo{e, options} logger.V(2).Info("Device plugin connected", "resourceName", resourceName) return nil @@ -260,22 +260,28 @@ func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string m.mutex.Lock() defer m.mutex.Unlock() - endpoints := m.endpointStore[resourceName] - i := slices.IndexFunc(endpoints, func(ep *endpointInfo) bool { - return ep.e.socketPath() == socketPath - }) - if i == -1 { + endpoints, ok := m.endpointStore[resourceName] + if !ok { return } - m.endpoints[resourceName].e.setStopTime(time.Now()) + ep, exists := endpoints[socketPath] + if !exists { + return + } + ep.e.setStopTime(time.Now()) + last := len(endpoints) == 1 if last { delete(m.endpointStore, resourceName) m.markResourceUnhealthy(logger, resourceName) logger.V(2).Info("Last DP for this resource disconnected", "resource", resourceName) } else { - m.endpointStore[resourceName] = slices.Delete(endpoints, i, i+1) - m.endpoints[resourceName] = *m.endpointStore[resourceName][len(m.endpointStore[resourceName])-1] + delete(endpoints, socketPath) + // Promote an arbitrary remaining endpoint to the primary endpoints map + for _, remainingEp := range endpoints { + m.endpoints[resourceName] = *remainingEp + break + } } } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 096f372bcad..13c5181e01c 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -436,7 +436,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager) eInfo := endpointInfo{e: e2, opts: nil} testManager.endpoints[resourceName2] = eInfo - testManager.endpointStore[resourceName2] = []*endpointInfo{&eInfo} + testManager.endpointStore[resourceName2] = map[string]*endpointInfo{socketName: &eInfo} callback(logger, resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() as.Len(capacity, 2) @@ -2172,7 +2172,7 @@ func TestEndpointSyncOnDisconnect(t *testing.T) { eInfo := endpointInfo{e: ep, opts: nil} manager.endpoints[resourceName] = eInfo - manager.endpointStore[resourceName] = []*endpointInfo{&eInfo} + manager.endpointStore[resourceName] = map[string]*endpointInfo{socketName: &eInfo} devs := []*pluginapi.Device{ {ID: "Device1", Health: pluginapi.Healthy}, {ID: "Device2", Health: pluginapi.Healthy}, diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index 3ca40bb7202..c44a48e7fa7 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "os" - "slices" "time" core "k8s.io/api/core/v1" @@ -117,14 +116,22 @@ func (s *server) deregisterClient(logger klog.Logger, name string, socketPath st s.mutex.Lock() defer s.mutex.Unlock() - for i, c := range s.clients[name] { - if c.SocketPath() == socketPath { - s.clients[name] = slices.Delete(s.clients[name], i, i+1) - if len(s.clients[name]) == 0 { - delete(s.clients, name) - } + var newClients []Client + clientRemoved := false + for _, c := range s.clients[name] { + if !clientRemoved && c.SocketPath() == socketPath { + clientRemoved = true logger.V(2).Info("Deregistered client", "name", name, "socketPath", socketPath) - break + continue + } + newClients = append(newClients, c) + } + + if clientRemoved { + if len(newClients) == 0 { + delete(s.clients, name) + } else { + s.clients[name] = newClients } } } diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index c027669a82c..9a458b6ade4 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -346,9 +346,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) - ginkgo.It("Keeps device plugin assignments after one of the device plugins is deleted", func(ctx context.Context) { + ginkgo.It("Keeps device plugin assignments after delete the first device plugin", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) - pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) deviceIDRE := "stub devices: (Dev-[0-9]+)" devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) @@ -365,10 +365,105 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { v1PodResources, err = getV1NodeDevices(ctx) framework.ExpectNoError(err) - err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after deleting second device plugin") }) + ginkgo.It("Keeps device plugin assignments after delete the second device plugin", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + ginkgo.By("Deleting the second device plugin") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) + + ginkgo.By("Verifying the device assignment is preserved after deleting one device plugin") + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + v1PodResources, err = getV1NodeDevices(ctx) + framework.ExpectNoError(err) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after deleting second device plugin") + }) + + ginkgo.It("Keeps device plugin assignments after the second device plugin is disconnected and reconnected", func(ctx context.Context) { + // Recreate dp2 with a 10s startup delay exclusively for this test case + // so we have a realistic disconnected window. + err := e2epod.NewPodClient(f).Delete(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) + + dp2 := getSampleDevicePluginPod(pluginSockDir, "dp2") + dp2.Spec.Containers[0].Command = []string{"/bin/sh", "-c"} + dp2.Spec.Containers[0].Args = []string{"sleep 10 && exec /sampledeviceplugin -alsologtostderr"} + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be available on local node")) + + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + ginkgo.By("Making the second device plugin fail to respond and reconnect by killing its process") + dpPod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + initialRestartCount := dpPod.Status.ContainerStatuses[0].RestartCount + + _, _, err = e2epod.ExecCommandInContainerWithFullOutput(f, devicePluginPod2.Name, devicePluginPod2.Spec.Containers[0].Name, "kill", "1") + framework.ExpectNoError(err, "killing sampledeviceplugin process") + + ginkgo.By("Verifying that the plugin is disconnected and original device assignments are preserved") + time.Sleep(5 * time.Second) + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + v1PodResources, err := getV1NodeDevices(ctx) + framework.ExpectNoError(err) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after disconnecting the second DP") + + ginkgo.By("Waiting for container to restart and reconnect") + gomega.Eventually(ctx, func() int { + p, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + if err != nil || len(p.Status.ContainerStatuses) == 0 { + return 0 + } + return int(p.Status.ContainerStatuses[0].RestartCount) + }, 5*time.Minute, framework.Poll).Should(gomega.BeNumerically(">", initialRestartCount)) + + ginkgo.By("Verifying the new plugin reconnected, original device assignments are preserved") + time.Sleep(10 * time.Second) + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + v1PodResources, err = getV1NodeDevices(ctx) + framework.ExpectNoError(err) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after reconnecting the second DP") + + gomega.Eventually(ctx, func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && + e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected resource to be available on local node")) + }) + // simulate kubelet restart. A compliant device plugin is expected to re-register, while the pod and the container stays running. // The flow with buggy or slow device plugin is deferred to another test. // The device assignment should be kept and be stable across the kubelet restart, because it's the kubelet which performs the device allocation, @@ -902,7 +997,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { dp2 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: SampleDevicePluginName + "-2", + Name: e2enode.SampleDevicePluginName + "-2", }, Spec: ds.Spec.Template.Spec, } From ce680fea207a0bf944457d5d92a0bfd599f37e1e Mon Sep 17 00:00:00 2001 From: Xinyun Liu Date: Tue, 21 Apr 2026 05:16:29 +0000 Subject: [PATCH 4/4] Add E2E test for multiple device-plugins scenarios --- pkg/kubelet/cm/devicemanager/manager.go | 2 +- .../devicemanager/plugin/v1beta1/handler.go | 25 +- test/e2e_node/device_plugin_multiple_test.go | 312 ++++++++++++++++++ test/e2e_node/device_plugin_test.go | 184 ++--------- 4 files changed, 348 insertions(+), 175 deletions(-) create mode 100644 test/e2e_node/device_plugin_multiple_test.go diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index a17a69c6bfe..b10f4b38366 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -274,7 +274,7 @@ func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string if last { delete(m.endpointStore, resourceName) m.markResourceUnhealthy(logger, resourceName) - logger.V(2).Info("Last DP for this resource disconnected", "resource", resourceName) + logger.V(2).Info("Last device plugin for this resource disconnected", "resource", resourceName) } else { delete(endpoints, socketPath) // Promote an arbitrary remaining endpoint to the primary endpoints map diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index c44a48e7fa7..990c870b1e1 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -87,12 +87,13 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri s.registerClient(logger, name, c) if err := c.Connect(ctx); err != nil { + // Need to re-connect the client if connection fails s.deregisterClient(logger, name, socketPath) - logger.Error(err, "Failed to connect to new client", "resource", name) + logger.Error(err, "Failed to connect to new client", "resource", name, "socketPath", socketPath) return err } - logger.V(2).Info("Connected to new client", "resource", name) + logger.V(2).Info("Connected to new client", "resource", name, "socketPath", socketPath) go func() { s.runClient(ctx, name, c) }() @@ -109,30 +110,30 @@ func (s *server) registerClient(logger klog.Logger, name string, c Client) { defer s.mutex.Unlock() s.clients[name] = append(s.clients[name], c) - logger.V(2).Info("Registered client", "name", name) + logger.V(2).Info("Registered client", "name", name, "socketPath", c.SocketPath()) } func (s *server) deregisterClient(logger klog.Logger, name string, socketPath string) { s.mutex.Lock() defer s.mutex.Unlock() + // When a client is deregistered, we will rebuild the clients array, removing the given client. + // We intentionally avoid mutating in place. + // We only remove the connection when both the client name and socket path matches. + // This ensures if there is two connections with same client name, only that specific client is removed. var newClients []Client - clientRemoved := false for _, c := range s.clients[name] { - if !clientRemoved && c.SocketPath() == socketPath { - clientRemoved = true + if c.SocketPath() == socketPath { logger.V(2).Info("Deregistered client", "name", name, "socketPath", socketPath) continue } newClients = append(newClients, c) } - if clientRemoved { - if len(newClients) == 0 { - delete(s.clients, name) - } else { - s.clients[name] = newClients - } + if len(newClients) == 0 { + delete(s.clients, name) + } else { + s.clients[name] = newClients } } diff --git a/test/e2e_node/device_plugin_multiple_test.go b/test/e2e_node/device_plugin_multiple_test.go new file mode 100644 index 00000000000..334e8145a42 --- /dev/null +++ b/test/e2e_node/device_plugin_multiple_test.go @@ -0,0 +1,312 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "path/filepath" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" + "k8s.io/kubernetes/test/e2e/feature" + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + admissionapi "k8s.io/pod-security-admission/api" +) + +// Serial because the test restarts Kubelet +var _ = SIGDescribe("Device Plugin Multiple", framework.WithSerial(), feature.DevicePlugin, func() { + f := framework.NewDefaultFramework("device-plugin-errors") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + testDevicePluginMultiple(f, kubeletdevicepluginv1beta1.DevicePluginPath) +}) + +func testDevicePluginMultiple(f *framework.Framework, pluginSockDir string) { + pluginSockDir = filepath.Clean(pluginSockDir) + "/" + + f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { + var devicePluginPod, devicePluginPod2 *v1.Pod + var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse + var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse + var err error + + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrueBecause("expected node to be ready")) + + // Before we run the device plugin test, we need to ensure + // that the cluster is in a clean state and there are no + // pods running on this node. + // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. + // xref: https://issue.k8s.io/115381 + gomega.Eventually(ctx, func(ctx context.Context) error { + v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %w", err) + } + + v1PodResources, err = getV1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %w", err) + } + + if len(v1alphaPodResources.PodResources) > 0 { + return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) + } + + if len(v1PodResources.PodResources) > 0 { + return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) + } + return nil + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) + + ginkgo.By("Scheduling a sample device plugin pod") + dp := getSampleDevicePluginPod(pluginSockDir, "dp1") + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be available on local node")) + framework.Logf("Successfully created device plugin pod") + + ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", e2enode.SampleDevsAmount)) + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && + e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected resource to be available on local node")) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pods") + if devicePluginPod != nil { + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + if devicePluginPod2 != nil { + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + + restartKubelet(ctx, true) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be unavailable on local node")) + + ginkgo.By("devices now unavailable on the local node") + }) + + // Pod1 scheduled with DP1, DP2 appears, Pod2 scheduled successfully with both DPs running, DP1 is deleted, + // Pod3 is scheduled successfully after DP1 is removed. + ginkgo.It("Device Plugin Multiple: Basic rollout of device plugins with zero downtime", func(ctx context.Context) { + ginkgo.By("Scheduling Pod1 with DP1 successfully") + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + pod1, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Creating DP2") + dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2") + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears")) + + ginkgo.By("Verifying DP2 is running") + dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling Pod2 while both are running") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod2 is running") + pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Deleting DP1") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + + ginkgo.By("Waiting for DP1 to unregister, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices to decrease after DP1 disappears")) + + ginkgo.By("Deleting Pod1 to free up resources") + e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + + ginkgo.By("Scheduling Pod3 successfully after DP1 disappeared") + pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod3 is running") + pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning)) + }) + + // Pod1 scheduled with DP1, DP2 appears after 30 seconds, Pod2 scheduled successfully before DP2 connected, Pod3 scheduled successfully after DP2 connected. + ginkgo.It("Device Plugin Multiple: DP2 takes long time to start working", func(ctx context.Context) { + var err error + ginkgo.By("Scheduling Pod1 with DP1 successfully") + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling DP2 with a delay") + dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2") + dp2.Spec.Containers[0].Command = []string{"/bin/sh", "-c"} + dp2.Spec.Containers[0].Args = []string{"sleep 30 && exec /sampledeviceplugin -alsologtostderr"} + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + ginkgo.By("Scheduling Pod2 successfully") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod2 is running") + pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears")) + + ginkgo.By("Verifying DP2 is running") + dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Deleting Pod1 to free up resources") + e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + + ginkgo.By("Scheduling Pod3 after DP2 registration succeeds") + pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod3 is running") + pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning)) + }) + + // Pod1 scheduled with DP1, DP2 appears, Pod2 scheduled successfully with both DPs running, + // DP2 is deleted, Pod3 is scheduled successfully after DP2 is removed. + ginkgo.It("Device Plugin Multiple: DP2 changed its mind", func(ctx context.Context) { + var err error + ginkgo.By("Scheduling Pod1 with DP1 successfully") + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling DP2") + dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2") + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears")) + + ginkgo.By("Verifying DP2 is running") + dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling Pod2 while both are running") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod2 is running") + pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Deleting DP2") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) + + ginkgo.By("Waiting for DP2 to unregister, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 disappears")) + + ginkgo.By("Deleting Pod1 to free up resources") + e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + + ginkgo.By("Scheduling Pod3 successfully after DP2 disappeared") + pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod3 is running") + pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning)) + }) + }) +} + +// This is a quick fix to allow duplicated device plugin otherwise, because CDI is hardcoded, +// we need another device plugin yaml. +// getSampleDevicePluginPodMultiple returns the Sample Device Plugin pod with CDI disabled. +func getSampleDevicePluginPodMultiple(pluginSockDir string, version string) *v1.Pod { + dp := getSampleDevicePluginPod(pluginSockDir, version) + for i := range dp.Spec.Containers[0].Env { + if dp.Spec.Containers[0].Env[i].Name == "CDI_ENABLED" { + dp.Spec.Containers[0].Env[i].Value = "" + } + } + return dp +} diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index 9a458b6ade4..29264378050 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -130,7 +130,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { } f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { - var devicePluginPod, devicePluginPod2, dptemplate, dptemplate2 *v1.Pod + var devicePluginPod, dptemplate *v1.Pod var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse var err error @@ -170,12 +170,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) ginkgo.By("Scheduling a sample device plugin pod") - dp := getSampleDevicePluginPod(pluginSockDir, "dp1") - dp2 := getSampleDevicePluginPod(pluginSockDir, "dp2") + dp := getSampleDevicePluginPod(pluginSockDir, "") dptemplate = dp.DeepCopy() - dptemplate2 = dp2.DeepCopy() devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) - devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) ginkgo.By("Waiting for devices to become available on the local node") gomega.Eventually(ctx, func(ctx context.Context) bool { @@ -196,7 +193,6 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.AfterEach(func(ctx context.Context) { ginkgo.By("Deleting the device plugin pod") e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) ginkgo.By("Deleting any Pods created by the test") l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) @@ -240,8 +236,8 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.Logf("len(v1alphaPodResources.PodResources):%+v", len(v1alphaPodResources.PodResources)) framework.Logf("len(v1PodResources.PodResources):%+v", len(v1PodResources.PodResources)) - gomega.Expect(v1alphaPodResources.PodResources).To(gomega.HaveLen(3)) - gomega.Expect(v1PodResources.PodResources).To(gomega.HaveLen(3)) + gomega.Expect(v1alphaPodResources.PodResources).To(gomega.HaveLen(2)) + gomega.Expect(v1PodResources.PodResources).To(gomega.HaveLen(2)) var v1alphaResourcesForOurPod *kubeletpodresourcesv1alpha1.PodResources for _, res := range v1alphaPodResources.GetPodResources() { @@ -346,124 +342,6 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) - ginkgo.It("Keeps device plugin assignments after delete the first device plugin", func(ctx context.Context) { - podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) - pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) - deviceIDRE := "stub devices: (Dev-[0-9]+)" - devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) - framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) - gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") - - ginkgo.By("Deleting the first device plugin") - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) - waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) - - ginkgo.By("Verifying the device assignment is preserved after deleting one device plugin") - pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) - framework.ExpectNoError(err) - gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) - - v1PodResources, err = getV1NodeDevices(ctx) - framework.ExpectNoError(err) - err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after deleting second device plugin") - }) - - ginkgo.It("Keeps device plugin assignments after delete the second device plugin", func(ctx context.Context) { - podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) - pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) - deviceIDRE := "stub devices: (Dev-[0-9]+)" - devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) - framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) - gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") - - ginkgo.By("Deleting the second device plugin") - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) - waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) - - ginkgo.By("Verifying the device assignment is preserved after deleting one device plugin") - pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) - framework.ExpectNoError(err) - gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) - - v1PodResources, err = getV1NodeDevices(ctx) - framework.ExpectNoError(err) - err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after deleting second device plugin") - }) - - ginkgo.It("Keeps device plugin assignments after the second device plugin is disconnected and reconnected", func(ctx context.Context) { - // Recreate dp2 with a 10s startup delay exclusively for this test case - // so we have a realistic disconnected window. - err := e2epod.NewPodClient(f).Delete(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) - - dp2 := getSampleDevicePluginPod(pluginSockDir, "dp2") - dp2.Spec.Containers[0].Command = []string{"/bin/sh", "-c"} - dp2.Spec.Containers[0].Args = []string{"sleep 10 && exec /sampledeviceplugin -alsologtostderr"} - devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) - - gomega.Eventually(ctx, func(ctx context.Context) bool { - node, ready := getLocalTestNode(ctx, f) - return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount - }, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be available on local node")) - - podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) - pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) - deviceIDRE := "stub devices: (Dev-[0-9]+)" - devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) - framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) - gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") - - ginkgo.By("Making the second device plugin fail to respond and reconnect by killing its process") - dpPod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) - framework.ExpectNoError(err) - initialRestartCount := dpPod.Status.ContainerStatuses[0].RestartCount - - _, _, err = e2epod.ExecCommandInContainerWithFullOutput(f, devicePluginPod2.Name, devicePluginPod2.Spec.Containers[0].Name, "kill", "1") - framework.ExpectNoError(err, "killing sampledeviceplugin process") - - ginkgo.By("Verifying that the plugin is disconnected and original device assignments are preserved") - time.Sleep(5 * time.Second) - pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) - framework.ExpectNoError(err) - gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) - - v1PodResources, err := getV1NodeDevices(ctx) - framework.ExpectNoError(err) - err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after disconnecting the second DP") - - ginkgo.By("Waiting for container to restart and reconnect") - gomega.Eventually(ctx, func() int { - p, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) - if err != nil || len(p.Status.ContainerStatuses) == 0 { - return 0 - } - return int(p.Status.ContainerStatuses[0].RestartCount) - }, 5*time.Minute, framework.Poll).Should(gomega.BeNumerically(">", initialRestartCount)) - - ginkgo.By("Verifying the new plugin reconnected, original device assignments are preserved") - time.Sleep(10 * time.Second) - - pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) - framework.ExpectNoError(err) - gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) - - v1PodResources, err = getV1NodeDevices(ctx) - framework.ExpectNoError(err) - err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, e2enode.SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after reconnecting the second DP") - - gomega.Eventually(ctx, func() bool { - node, ready := getLocalTestNode(ctx, f) - return ready && - e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && - e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount - }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected resource to be available on local node")) - }) - // simulate kubelet restart. A compliant device plugin is expected to re-register, while the pod and the container stays running. // The flow with buggy or slow device plugin is deferred to another test. // The device assignment should be kept and be stable across the kubelet restart, because it's the kubelet which performs the device allocation, @@ -609,8 +487,6 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { gomega.Expect(e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)).To(gomega.Succeed()) ginkgo.By("Deleting the device plugin") - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) - waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) @@ -660,16 +536,11 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { } e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, f.Timeouts.PodDelete) waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, deleteOptions, f.Timeouts.PodDelete) - waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) ginkgo.By("Recreating the plugin pod") - devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) - err1 := e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod2.Name, devicePluginPod2.Namespace, 1*time.Minute) - devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate2) - err2 := e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) - framework.ExpectNoError(err1) - framework.ExpectNoError(err2) + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) + err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) + framework.ExpectNoError(err) ginkgo.By("Waiting for resource to become available on the local node after re-registration") gomega.Eventually(ctx, func() bool { @@ -742,12 +613,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { } e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, f.Timeouts.PodDelete) waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, deleteOptions, f.Timeouts.PodDelete) - waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) ginkgo.By("Recreating the plugin pod") devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) - devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate2) ginkgo.By("Waiting for resource to become available on the local node after restart") gomega.Eventually(ctx, func() bool { @@ -887,7 +755,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.Logf("PodResources.PodResources:%+v\n", podResources.PodResources) framework.Logf("len(PodResources.PodResources):%+v", len(podResources.PodResources)) - gomega.Expect(podResources.PodResources).To(gomega.HaveLen(3)) + gomega.Expect(podResources.PodResources).To(gomega.HaveLen(2)) var resourcesForOurPod *kubeletpodresourcesv1.PodResources for _, res := range podResources.GetPodResources() { @@ -922,7 +790,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { - var devicePluginPod, devicePluginPod2 *v1.Pod + var devicePluginPod *v1.Pod var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse var triggerPathFile, triggerPathDir string var err error @@ -995,27 +863,14 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { Spec: ds.Spec.Template.Spec, } - dp2 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: e2enode.SampleDevicePluginName + "-2", - }, - Spec: ds.Spec.Template.Spec, - } - - devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) framework.Logf("Waiting for device plugin pod to be Running") - err1 := e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPod.Namespace, devicePluginPod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) - err2 := e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPod2.Namespace, devicePluginPod2.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) - if err1 != nil { - framework.Logf("Sample Device Pod %v took too long to enter running/ready: %v", dp.Name, err1) + err = e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPod.Namespace, devicePluginPod.Name, "Ready", 2*time.Minute, testutils.PodRunningReady) + if err != nil { + framework.Logf("Sample Device Pod %v took too long to enter running/ready: %v", dp.Name, err) } - if err2 != nil { - framework.Logf("Sample Device Pod %v took too long to enter running/ready: %v", dp2.Name, err2) - } - framework.ExpectNoError(err1, "WaitForPodCondition() failed err: %v", err1) - framework.ExpectNoError(err2, "WaitForPodCondition() failed err: %v", err2) + framework.ExpectNoError(err, "WaitForPodCondition() failed err: %v", err) go func() { // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE @@ -1049,7 +904,6 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { ginkgo.AfterEach(func(ctx context.Context) { ginkgo.By("Deleting the device plugin pod") e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) ginkgo.By("Deleting any Pods created by the test") l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) @@ -1257,16 +1111,20 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta } // getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests. -func getSampleDevicePluginPod(pluginSockDir string, uniqueName string) *v1.Pod { +func getSampleDevicePluginPod(pluginSockDir string, version string) *v1.Pod { data, err := e2etestfiles.Read(e2enode.SampleDevicePluginDSYAML) if err != nil { framework.Fail(err.Error()) } ds := readDaemonSetV1OrDie(data) + podName := e2enode.SampleDevicePluginName + if version != "" { + podName = podName + "-" + version + } dp := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: e2enode.SampleDevicePluginName + "-" + uniqueName, + Name: podName, }, Spec: ds.Spec.Template.Spec, } @@ -1277,7 +1135,9 @@ func getSampleDevicePluginPod(pluginSockDir string, uniqueName string) *v1.Pod { } dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "CDI_ENABLED", Value: "1"}) - dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "UNIQUE_NAME", Value: uniqueName}) + if version != "" { + dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "UNIQUE_NAME", Value: version}) + } return dp }