diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index f9f0f2845be..c0b65ed69a3 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -36,11 +36,13 @@ type endpoint interface { setStopTime(t time.Time) isStopped() bool stopGracePeriodExpired() bool + socketPath() string } type endpointImpl struct { mutex sync.Mutex resourceName string + socket string api pluginapi.DevicePluginClient stopTime time.Time client plugin.Client // for testing only @@ -52,6 +54,7 @@ func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl { return &endpointImpl{ api: p.API(), resourceName: p.Resource(), + socket: p.SocketPath(), } } @@ -64,6 +67,10 @@ func newStoppedEndpointImpl(resourceName string) *endpointImpl { } } +func (e *endpointImpl) socketPath() string { + return e.socket +} + func (e *endpointImpl) isStopped() bool { e.mutex.Lock() defer e.mutex.Unlock() diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index 726dd7f4342..ecc52da3715 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -43,7 +43,7 @@ func newMockPluginManager() *mockPluginManager { return &mockPluginManager{ func(string) error { return nil }, func(string, plugin.DevicePlugin) error { return nil }, - func(klog.Logger, string) {}, + func(klog.Logger, string, string) {}, func(string, *pluginapi.ListAndWatchResponse) {}, } } @@ -51,7 +51,7 @@ func newMockPluginManager() *mockPluginManager { type mockPluginManager struct { cleanupPluginDirectory func(string) error pluginConnected func(string, plugin.DevicePlugin) error - pluginDisconnected func(klog.Logger, string) + pluginDisconnected func(klog.Logger, string, string) pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse) } @@ -63,8 +63,8 @@ func (m *mockPluginManager) PluginConnected(_ context.Context, r string, p plugi return m.pluginConnected(r, p) } -func (m *mockPluginManager) PluginDisconnected(logger klog.Logger, r string) { - m.pluginDisconnected(logger, r) +func (m *mockPluginManager) PluginDisconnected(logger klog.Logger, r string, s string) { + m.pluginDisconnected(logger, r, s) } func (m *mockPluginManager) PluginListAndWatchReceiver(_ klog.Logger, r string, lr *pluginapi.ListAndWatchResponse) { @@ -288,7 +288,7 @@ func esetup(ctx context.Context, t *testing.T, devs []*pluginapi.Device, socket, e := newEndpointImpl(dp) e.client = c - m.pluginDisconnected = func(logger klog.Logger, r string) { + m.pluginDisconnected = func(logger klog.Logger, r string, s string) { e.setStopTime(time.Now()) } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index cd796773a2c..b10f4b38366 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -62,6 +62,8 @@ type ActivePodsFunc func() []*v1.Pod type ManagerImpl struct { checkpointdir string + endpointStore map[string]map[string]*endpointInfo // resourceName -> socketPath -> endpointInfo + endpoints map[string]endpointInfo // Key is ResourceName mutex sync.Mutex @@ -149,8 +151,8 @@ func newManagerImpl(logger klog.Logger, socketPath string, topology []cadvisorap } manager := &ManagerImpl{ - endpoints: make(map[string]endpointInfo), - + endpoints: make(map[string]endpointInfo), + endpointStore: make(map[string]map[string]*endpointInfo), allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.Set[string]), unhealthyDevices: make(map[string]sets.Set[string]), @@ -236,7 +238,17 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string, m.mutex.Lock() defer m.mutex.Unlock() + if m.endpointStore == nil { + m.endpointStore = make(map[string]map[string]*endpointInfo) + } + if m.endpointStore[resourceName] == nil { + m.endpointStore[resourceName] = make(map[string]*endpointInfo) + } + if _, exists := m.endpointStore[resourceName][e.socketPath()]; exists { + return fmt.Errorf("device plugin already connected: %s", e.socketPath()) + } m.endpoints[resourceName] = endpointInfo{e, options} + m.endpointStore[resourceName][e.socketPath()] = &endpointInfo{e, options} logger.V(2).Info("Device plugin connected", "resourceName", resourceName) return nil @@ -244,15 +256,32 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string, // PluginDisconnected is to disconnect a plugin from an endpoint. // This is done as part of device plugin deregistration. -func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string) { +func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string, socketPath string) { m.mutex.Lock() defer m.mutex.Unlock() - if ep, exists := m.endpoints[resourceName]; exists { - m.markResourceUnhealthy(logger, resourceName) - logger.V(2).Info("Endpoint became unhealthy", "resourceName", resourceName) + endpoints, ok := m.endpointStore[resourceName] + if !ok { + return + } + ep, exists := endpoints[socketPath] + if !exists { + return + } + ep.e.setStopTime(time.Now()) - ep.e.setStopTime(time.Now()) + last := len(endpoints) == 1 + if last { + delete(m.endpointStore, resourceName) + m.markResourceUnhealthy(logger, resourceName) + logger.V(2).Info("Last device plugin for this resource disconnected", "resource", resourceName) + } else { + delete(endpoints, socketPath) + // Promote an arbitrary remaining endpoint to the primary endpoints map + for _, remainingEp := range endpoints { + m.endpoints[resourceName] = *remainingEp + break + } } } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 38add8678e7..13c5181e01c 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -432,9 +432,11 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Tests adding another resource. resourceName2 := "resource2" - e2 := &endpointImpl{} + e2 := &endpointImpl{socket: socketName} e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager) - testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} + eInfo := endpointInfo{e: e2, opts: nil} + testManager.endpoints[resourceName2] = eInfo + testManager.endpointStore[resourceName2] = map[string]*endpointInfo{socketName: &eInfo} callback(logger, resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() as.Len(capacity, 2) @@ -808,6 +810,7 @@ type MockEndpoint struct { getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error) initChan chan []string + socket string } func (m *MockEndpoint) preStartContainer(_ context.Context, devs []string) (*pluginapi.PreStartContainerResponse, error) { @@ -835,6 +838,10 @@ func (m *MockEndpoint) isStopped() bool { return false } func (m *MockEndpoint) stopGracePeriodExpired() bool { return false } +func (m *MockEndpoint) socketPath() string { + return m.socket +} + func makePod(limits v1.ResourceList) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -2160,9 +2167,12 @@ func TestEndpointSyncOnDisconnect(t *testing.T) { resourceName: resourceName, client: plugin.NewPluginClient(resourceName, socketName, manager), stopTime: time.Now().Add(-endpointStopGracePeriod * 2), // make the grace period expired + socket: socketName, } - manager.endpoints[resourceName] = endpointInfo{e: ep, opts: nil} + eInfo := endpointInfo{e: ep, opts: nil} + manager.endpoints[resourceName] = eInfo + manager.endpointStore[resourceName] = map[string]*endpointInfo{socketName: &eInfo} devs := []*pluginapi.Device{ {ID: "Device1", Health: pluginapi.Healthy}, {ID: "Device2", Health: pluginapi.Healthy}, diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go index b400a212564..122796138dc 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/api.go @@ -32,7 +32,7 @@ type RegistrationHandler interface { // ClientHandler is an interface for handling device plugin connections. type ClientHandler interface { PluginConnected(context.Context, string, DevicePlugin) error - PluginDisconnected(klog.Logger, string) + PluginDisconnected(klog.Logger, string, string) PluginListAndWatchReceiver(klog.Logger, string, *api.ListAndWatchResponse) } diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go index a6e0e79c10f..9655086f6ca 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go @@ -42,6 +42,7 @@ type Client interface { Connect(context.Context) error Run(context.Context) Disconnect(klog.Logger) error + SocketPath() string } type client struct { @@ -109,7 +110,7 @@ func (c *client) Disconnect(logger klog.Logger) error { c.grpc = nil } c.mutex.Unlock() - c.handler.PluginDisconnected(logger, c.resource) + c.handler.PluginDisconnected(logger, c.resource, c.socket) logger.V(2).Info("Device plugin disconnected", "resource", c.resource) return nil diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index 472399ffe6d..990c870b1e1 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -54,10 +54,11 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s func (s *server) DeRegisterPlugin(pluginName, endpoint string) { logger := klog.FromContext(context.TODO()) logger.V(2).Info("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint) - client := s.getClient(pluginName) + // endpoint in DeRegisterPlugin is the socket path + client := s.getClient(pluginName, endpoint) if client != nil { if err := s.disconnectClient(logger, pluginName, client); err != nil { - logger.Error(err, "disconnecting client", "plugin", pluginName, "endpoing", endpoint) + logger.Error(err, "disconnecting client", "plugin", pluginName, "endpoint", endpoint) } } } @@ -86,12 +87,13 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri s.registerClient(logger, name, c) if err := c.Connect(ctx); err != nil { - s.deregisterClient(logger, name) - logger.Error(err, "Failed to connect to new client", "resource", name) + // Need to re-connect the client if connection fails + s.deregisterClient(logger, name, socketPath) + logger.Error(err, "Failed to connect to new client", "resource", name, "socketPath", socketPath) return err } - logger.V(2).Info("Connected to new client", "resource", name) + logger.V(2).Info("Connected to new client", "resource", name, "socketPath", socketPath) go func() { s.runClient(ctx, name, c) }() @@ -100,30 +102,46 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri } func (s *server) disconnectClient(logger klog.Logger, name string, c Client) error { - s.deregisterClient(logger, name) + s.deregisterClient(logger, name, c.SocketPath()) return c.Disconnect(logger) } func (s *server) registerClient(logger klog.Logger, name string, c Client) { s.mutex.Lock() defer s.mutex.Unlock() - s.clients[name] = c - logger.V(2).Info("Registered client", "name", name) + s.clients[name] = append(s.clients[name], c) + logger.V(2).Info("Registered client", "name", name, "socketPath", c.SocketPath()) } -func (s *server) deregisterClient(logger klog.Logger, name string) { +func (s *server) deregisterClient(logger klog.Logger, name string, socketPath string) { s.mutex.Lock() defer s.mutex.Unlock() - delete(s.clients, name) - logger.V(2).Info("Deregistered client", "name", name) + // When a client is deregistered, we will rebuild the clients array, removing the given client. + // We intentionally avoid mutating in place. + // We only remove the connection when both the client name and socket path matches. + // This ensures if there is two connections with same client name, only that specific client is removed. + var newClients []Client + for _, c := range s.clients[name] { + if c.SocketPath() == socketPath { + logger.V(2).Info("Deregistered client", "name", name, "socketPath", socketPath) + continue + } + newClients = append(newClients, c) + } + + if len(newClients) == 0 { + delete(s.clients, name) + } else { + s.clients[name] = newClients + } } func (s *server) runClient(ctx context.Context, name string, c Client) { logger := klog.FromContext(ctx) c.Run(ctx) - c = s.getClient(name) + c = s.getClient(name, c.SocketPath()) if c == nil { return } @@ -133,8 +151,13 @@ func (s *server) runClient(ctx context.Context, name string, c Client) { } } -func (s *server) getClient(name string) Client { +func (s *server) getClient(name string, socketPath string) Client { s.mutex.Lock() defer s.mutex.Unlock() - return s.clients[name] + for _, c := range s.clients[name] { + if c.SocketPath() == socketPath { + return c + } + } + return nil } diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go index 320ee4d80ea..6372469c1fb 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go @@ -55,7 +55,7 @@ type server struct { grpc *grpc.Server rhandler RegistrationHandler chandler ClientHandler - clients map[string]Client + clients map[string][]Client // lastError records the last runtime error. A server is considered healthy till an actual error occurs. lastError error @@ -77,7 +77,7 @@ func NewServer(logger klog.Logger, socketPath string, rh RegistrationHandler, ch socketDir: dir, rhandler: rh, chandler: ch, - clients: make(map[string]Client), + clients: make(map[string][]Client), } return s, nil @@ -198,10 +198,12 @@ func (s *server) isVersionCompatibleWithPlugin(versions ...string) bool { func (s *server) visitClients(visit func(r string, c Client)) { s.mutex.Lock() - for r, c := range s.clients { - s.mutex.Unlock() - visit(r, c) - s.mutex.Lock() + for r, cs := range s.clients { + for _, c := range cs { + s.mutex.Unlock() + visit(r, c) + s.mutex.Lock() + } } s.mutex.Unlock() } diff --git a/test/e2e_node/device_plugin_multiple_test.go b/test/e2e_node/device_plugin_multiple_test.go new file mode 100644 index 00000000000..334e8145a42 --- /dev/null +++ b/test/e2e_node/device_plugin_multiple_test.go @@ -0,0 +1,312 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "path/filepath" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" + "k8s.io/kubernetes/test/e2e/feature" + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + admissionapi "k8s.io/pod-security-admission/api" +) + +// Serial because the test restarts Kubelet +var _ = SIGDescribe("Device Plugin Multiple", framework.WithSerial(), feature.DevicePlugin, func() { + f := framework.NewDefaultFramework("device-plugin-errors") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + testDevicePluginMultiple(f, kubeletdevicepluginv1beta1.DevicePluginPath) +}) + +func testDevicePluginMultiple(f *framework.Framework, pluginSockDir string) { + pluginSockDir = filepath.Clean(pluginSockDir) + "/" + + f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { + var devicePluginPod, devicePluginPod2 *v1.Pod + var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse + var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse + var err error + + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrueBecause("expected node to be ready")) + + // Before we run the device plugin test, we need to ensure + // that the cluster is in a clean state and there are no + // pods running on this node. + // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. + // xref: https://issue.k8s.io/115381 + gomega.Eventually(ctx, func(ctx context.Context) error { + v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %w", err) + } + + v1PodResources, err = getV1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %w", err) + } + + if len(v1alphaPodResources.PodResources) > 0 { + return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) + } + + if len(v1PodResources.PodResources) > 0 { + return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) + } + return nil + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) + + ginkgo.By("Scheduling a sample device plugin pod") + dp := getSampleDevicePluginPod(pluginSockDir, "dp1") + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be available on local node")) + framework.Logf("Successfully created device plugin pod") + + ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", e2enode.SampleDevsAmount)) + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && + e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected resource to be available on local node")) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pods") + if devicePluginPod != nil { + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + if devicePluginPod2 != nil { + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + + restartKubelet(ctx, true) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be unavailable on local node")) + + ginkgo.By("devices now unavailable on the local node") + }) + + // Pod1 scheduled with DP1, DP2 appears, Pod2 scheduled successfully with both DPs running, DP1 is deleted, + // Pod3 is scheduled successfully after DP1 is removed. + ginkgo.It("Device Plugin Multiple: Basic rollout of device plugins with zero downtime", func(ctx context.Context) { + ginkgo.By("Scheduling Pod1 with DP1 successfully") + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + pod1, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Creating DP2") + dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2") + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears")) + + ginkgo.By("Verifying DP2 is running") + dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling Pod2 while both are running") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod2 is running") + pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Deleting DP1") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + + ginkgo.By("Waiting for DP1 to unregister, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices to decrease after DP1 disappears")) + + ginkgo.By("Deleting Pod1 to free up resources") + e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + + ginkgo.By("Scheduling Pod3 successfully after DP1 disappeared") + pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod3 is running") + pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning)) + }) + + // Pod1 scheduled with DP1, DP2 appears after 30 seconds, Pod2 scheduled successfully before DP2 connected, Pod3 scheduled successfully after DP2 connected. + ginkgo.It("Device Plugin Multiple: DP2 takes long time to start working", func(ctx context.Context) { + var err error + ginkgo.By("Scheduling Pod1 with DP1 successfully") + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling DP2 with a delay") + dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2") + dp2.Spec.Containers[0].Command = []string{"/bin/sh", "-c"} + dp2.Spec.Containers[0].Args = []string{"sleep 30 && exec /sampledeviceplugin -alsologtostderr"} + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + ginkgo.By("Scheduling Pod2 successfully") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod2 is running") + pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears")) + + ginkgo.By("Verifying DP2 is running") + dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Deleting Pod1 to free up resources") + e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + + ginkgo.By("Scheduling Pod3 after DP2 registration succeeds") + pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod3 is running") + pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning)) + }) + + // Pod1 scheduled with DP1, DP2 appears, Pod2 scheduled successfully with both DPs running, + // DP2 is deleted, Pod3 is scheduled successfully after DP2 is removed. + ginkgo.It("Device Plugin Multiple: DP2 changed its mind", func(ctx context.Context) { + var err error + ginkgo.By("Scheduling Pod1 with DP1 successfully") + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling DP2") + dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2") + devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2) + + ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears")) + + ginkgo.By("Verifying DP2 is running") + dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Scheduling Pod2 while both are running") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod2 is running") + pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning)) + + ginkgo.By("Deleting DP2") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace) + + ginkgo.By("Waiting for DP2 to unregister, and number of devices to remain unchanged") + gomega.Consistently(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 disappears")) + + ginkgo.By("Deleting Pod1 to free up resources") + e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + + ginkgo.By("Scheduling Pod3 successfully after DP2 disappeared") + pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD)) + + ginkgo.By("Verifying Pod3 is running") + pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning)) + }) + }) +} + +// This is a quick fix to allow duplicated device plugin otherwise, because CDI is hardcoded, +// we need another device plugin yaml. +// getSampleDevicePluginPodMultiple returns the Sample Device Plugin pod with CDI disabled. +func getSampleDevicePluginPodMultiple(pluginSockDir string, version string) *v1.Pod { + dp := getSampleDevicePluginPod(pluginSockDir, version) + for i := range dp.Spec.Containers[0].Env { + if dp.Spec.Containers[0].Env[i].Name == "CDI_ENABLED" { + dp.Spec.Containers[0].Env[i].Value = "" + } + } + return dp +} diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index bd33b74014e..fd9bfbfb418 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -170,7 +170,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) ginkgo.By("Scheduling a sample device plugin pod") - dp := getSampleDevicePluginPod(pluginSockDir) + dp := getSampleDevicePluginPod(pluginSockDir, "") dptemplate = dp.DeepCopy() devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) @@ -1104,16 +1104,20 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta } // getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests. -func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { +func getSampleDevicePluginPod(pluginSockDir string, version string) *v1.Pod { data, err := e2etestfiles.Read(e2enode.SampleDevicePluginDSYAML) if err != nil { framework.Fail(err.Error()) } ds := readDaemonSetV1OrDie(data) + podName := e2enode.SampleDevicePluginName + if version != "" { + podName = podName + "-" + version + } dp := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: e2enode.SampleDevicePluginName, + Name: podName, }, Spec: ds.Spec.Template.Spec, } @@ -1124,6 +1128,9 @@ func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { } dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "CDI_ENABLED", Value: "1"}) + if version != "" { + dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "UNIQUE_NAME", Value: version}) + } return dp } diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index 4686a5aa54b..309f178397a 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -2118,7 +2118,7 @@ func getOnlineCPUs() (cpuset.CPUSet, error) { func setupSampleDevicePluginOrFail(ctx context.Context, f *framework.Framework) *v1.Pod { e2enode.WaitForNodeToBeReady(ctx, f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) - dp := getSampleDevicePluginPod(kubeletdevicepluginv1beta1.DevicePluginPath) + dp := getSampleDevicePluginPod(kubeletdevicepluginv1beta1.DevicePluginPath, "dp") dp.Spec.NodeName = framework.TestContext.NodeName ginkgo.By("Create the sample device plugin pod")