Merge pull request #135437 from zxqlxy/device-plugin-fix

DRA-like fix for device-plugin race condition problem
This commit is contained in:
Kubernetes Prow Robot 2026-05-02 10:01:24 +05:30 committed by GitHub
commit bb0bcb8a85
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 432 additions and 41 deletions

View file

@ -36,11 +36,13 @@ type endpoint interface {
setStopTime(t time.Time)
isStopped() bool
stopGracePeriodExpired() bool
socketPath() string
}
type endpointImpl struct {
mutex sync.Mutex
resourceName string
socket string
api pluginapi.DevicePluginClient
stopTime time.Time
client plugin.Client // for testing only
@ -52,6 +54,7 @@ func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl {
return &endpointImpl{
api: p.API(),
resourceName: p.Resource(),
socket: p.SocketPath(),
}
}
@ -64,6 +67,10 @@ func newStoppedEndpointImpl(resourceName string) *endpointImpl {
}
}
func (e *endpointImpl) socketPath() string {
return e.socket
}
func (e *endpointImpl) isStopped() bool {
e.mutex.Lock()
defer e.mutex.Unlock()

View file

@ -43,7 +43,7 @@ func newMockPluginManager() *mockPluginManager {
return &mockPluginManager{
func(string) error { return nil },
func(string, plugin.DevicePlugin) error { return nil },
func(klog.Logger, string) {},
func(klog.Logger, string, string) {},
func(string, *pluginapi.ListAndWatchResponse) {},
}
}
@ -51,7 +51,7 @@ func newMockPluginManager() *mockPluginManager {
type mockPluginManager struct {
cleanupPluginDirectory func(string) error
pluginConnected func(string, plugin.DevicePlugin) error
pluginDisconnected func(klog.Logger, string)
pluginDisconnected func(klog.Logger, string, string)
pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse)
}
@ -63,8 +63,8 @@ func (m *mockPluginManager) PluginConnected(_ context.Context, r string, p plugi
return m.pluginConnected(r, p)
}
func (m *mockPluginManager) PluginDisconnected(logger klog.Logger, r string) {
m.pluginDisconnected(logger, r)
func (m *mockPluginManager) PluginDisconnected(logger klog.Logger, r string, s string) {
m.pluginDisconnected(logger, r, s)
}
func (m *mockPluginManager) PluginListAndWatchReceiver(_ klog.Logger, r string, lr *pluginapi.ListAndWatchResponse) {
@ -288,7 +288,7 @@ func esetup(ctx context.Context, t *testing.T, devs []*pluginapi.Device, socket,
e := newEndpointImpl(dp)
e.client = c
m.pluginDisconnected = func(logger klog.Logger, r string) {
m.pluginDisconnected = func(logger klog.Logger, r string, s string) {
e.setStopTime(time.Now())
}

View file

@ -62,6 +62,8 @@ type ActivePodsFunc func() []*v1.Pod
type ManagerImpl struct {
checkpointdir string
endpointStore map[string]map[string]*endpointInfo // resourceName -> socketPath -> endpointInfo
endpoints map[string]endpointInfo // Key is ResourceName
mutex sync.Mutex
@ -149,8 +151,8 @@ func newManagerImpl(logger klog.Logger, socketPath string, topology []cadvisorap
}
manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
endpoints: make(map[string]endpointInfo),
endpointStore: make(map[string]map[string]*endpointInfo),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.Set[string]),
unhealthyDevices: make(map[string]sets.Set[string]),
@ -236,7 +238,17 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string,
m.mutex.Lock()
defer m.mutex.Unlock()
if m.endpointStore == nil {
m.endpointStore = make(map[string]map[string]*endpointInfo)
}
if m.endpointStore[resourceName] == nil {
m.endpointStore[resourceName] = make(map[string]*endpointInfo)
}
if _, exists := m.endpointStore[resourceName][e.socketPath()]; exists {
return fmt.Errorf("device plugin already connected: %s", e.socketPath())
}
m.endpoints[resourceName] = endpointInfo{e, options}
m.endpointStore[resourceName][e.socketPath()] = &endpointInfo{e, options}
logger.V(2).Info("Device plugin connected", "resourceName", resourceName)
return nil
@ -244,15 +256,32 @@ func (m *ManagerImpl) PluginConnected(ctx context.Context, resourceName string,
// PluginDisconnected is to disconnect a plugin from an endpoint.
// This is done as part of device plugin deregistration.
func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string) {
func (m *ManagerImpl) PluginDisconnected(logger klog.Logger, resourceName string, socketPath string) {
m.mutex.Lock()
defer m.mutex.Unlock()
if ep, exists := m.endpoints[resourceName]; exists {
m.markResourceUnhealthy(logger, resourceName)
logger.V(2).Info("Endpoint became unhealthy", "resourceName", resourceName)
endpoints, ok := m.endpointStore[resourceName]
if !ok {
return
}
ep, exists := endpoints[socketPath]
if !exists {
return
}
ep.e.setStopTime(time.Now())
ep.e.setStopTime(time.Now())
last := len(endpoints) == 1
if last {
delete(m.endpointStore, resourceName)
m.markResourceUnhealthy(logger, resourceName)
logger.V(2).Info("Last device plugin for this resource disconnected", "resource", resourceName)
} else {
delete(endpoints, socketPath)
// Promote an arbitrary remaining endpoint to the primary endpoints map
for _, remainingEp := range endpoints {
m.endpoints[resourceName] = *remainingEp
break
}
}
}

View file

@ -432,9 +432,11 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Tests adding another resource.
resourceName2 := "resource2"
e2 := &endpointImpl{}
e2 := &endpointImpl{socket: socketName}
e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager)
testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
eInfo := endpointInfo{e: e2, opts: nil}
testManager.endpoints[resourceName2] = eInfo
testManager.endpointStore[resourceName2] = map[string]*endpointInfo{socketName: &eInfo}
callback(logger, resourceName2, devs)
capacity, allocatable, removedResources = testManager.GetCapacity()
as.Len(capacity, 2)
@ -808,6 +810,7 @@ type MockEndpoint struct {
getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
initChan chan []string
socket string
}
func (m *MockEndpoint) preStartContainer(_ context.Context, devs []string) (*pluginapi.PreStartContainerResponse, error) {
@ -835,6 +838,10 @@ func (m *MockEndpoint) isStopped() bool { return false }
func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
func (m *MockEndpoint) socketPath() string {
return m.socket
}
func makePod(limits v1.ResourceList) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -2160,9 +2167,12 @@ func TestEndpointSyncOnDisconnect(t *testing.T) {
resourceName: resourceName,
client: plugin.NewPluginClient(resourceName, socketName, manager),
stopTime: time.Now().Add(-endpointStopGracePeriod * 2), // make the grace period expired
socket: socketName,
}
manager.endpoints[resourceName] = endpointInfo{e: ep, opts: nil}
eInfo := endpointInfo{e: ep, opts: nil}
manager.endpoints[resourceName] = eInfo
manager.endpointStore[resourceName] = map[string]*endpointInfo{socketName: &eInfo}
devs := []*pluginapi.Device{
{ID: "Device1", Health: pluginapi.Healthy},
{ID: "Device2", Health: pluginapi.Healthy},

View file

@ -32,7 +32,7 @@ type RegistrationHandler interface {
// ClientHandler is an interface for handling device plugin connections.
type ClientHandler interface {
PluginConnected(context.Context, string, DevicePlugin) error
PluginDisconnected(klog.Logger, string)
PluginDisconnected(klog.Logger, string, string)
PluginListAndWatchReceiver(klog.Logger, string, *api.ListAndWatchResponse)
}

View file

@ -42,6 +42,7 @@ type Client interface {
Connect(context.Context) error
Run(context.Context)
Disconnect(klog.Logger) error
SocketPath() string
}
type client struct {
@ -109,7 +110,7 @@ func (c *client) Disconnect(logger klog.Logger) error {
c.grpc = nil
}
c.mutex.Unlock()
c.handler.PluginDisconnected(logger, c.resource)
c.handler.PluginDisconnected(logger, c.resource, c.socket)
logger.V(2).Info("Device plugin disconnected", "resource", c.resource)
return nil

View file

@ -54,10 +54,11 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s
func (s *server) DeRegisterPlugin(pluginName, endpoint string) {
logger := klog.FromContext(context.TODO())
logger.V(2).Info("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint)
client := s.getClient(pluginName)
// endpoint in DeRegisterPlugin is the socket path
client := s.getClient(pluginName, endpoint)
if client != nil {
if err := s.disconnectClient(logger, pluginName, client); err != nil {
logger.Error(err, "disconnecting client", "plugin", pluginName, "endpoing", endpoint)
logger.Error(err, "disconnecting client", "plugin", pluginName, "endpoint", endpoint)
}
}
}
@ -86,12 +87,13 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri
s.registerClient(logger, name, c)
if err := c.Connect(ctx); err != nil {
s.deregisterClient(logger, name)
logger.Error(err, "Failed to connect to new client", "resource", name)
// Need to re-connect the client if connection fails
s.deregisterClient(logger, name, socketPath)
logger.Error(err, "Failed to connect to new client", "resource", name, "socketPath", socketPath)
return err
}
logger.V(2).Info("Connected to new client", "resource", name)
logger.V(2).Info("Connected to new client", "resource", name, "socketPath", socketPath)
go func() {
s.runClient(ctx, name, c)
}()
@ -100,30 +102,46 @@ func (s *server) connectClient(ctx context.Context, name string, socketPath stri
}
func (s *server) disconnectClient(logger klog.Logger, name string, c Client) error {
s.deregisterClient(logger, name)
s.deregisterClient(logger, name, c.SocketPath())
return c.Disconnect(logger)
}
func (s *server) registerClient(logger klog.Logger, name string, c Client) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.clients[name] = c
logger.V(2).Info("Registered client", "name", name)
s.clients[name] = append(s.clients[name], c)
logger.V(2).Info("Registered client", "name", name, "socketPath", c.SocketPath())
}
func (s *server) deregisterClient(logger klog.Logger, name string) {
func (s *server) deregisterClient(logger klog.Logger, name string, socketPath string) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.clients, name)
logger.V(2).Info("Deregistered client", "name", name)
// When a client is deregistered, we will rebuild the clients array, removing the given client.
// We intentionally avoid mutating in place.
// We only remove the connection when both the client name and socket path matches.
// This ensures if there is two connections with same client name, only that specific client is removed.
var newClients []Client
for _, c := range s.clients[name] {
if c.SocketPath() == socketPath {
logger.V(2).Info("Deregistered client", "name", name, "socketPath", socketPath)
continue
}
newClients = append(newClients, c)
}
if len(newClients) == 0 {
delete(s.clients, name)
} else {
s.clients[name] = newClients
}
}
func (s *server) runClient(ctx context.Context, name string, c Client) {
logger := klog.FromContext(ctx)
c.Run(ctx)
c = s.getClient(name)
c = s.getClient(name, c.SocketPath())
if c == nil {
return
}
@ -133,8 +151,13 @@ func (s *server) runClient(ctx context.Context, name string, c Client) {
}
}
func (s *server) getClient(name string) Client {
func (s *server) getClient(name string, socketPath string) Client {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.clients[name]
for _, c := range s.clients[name] {
if c.SocketPath() == socketPath {
return c
}
}
return nil
}

View file

@ -55,7 +55,7 @@ type server struct {
grpc *grpc.Server
rhandler RegistrationHandler
chandler ClientHandler
clients map[string]Client
clients map[string][]Client
// lastError records the last runtime error. A server is considered healthy till an actual error occurs.
lastError error
@ -77,7 +77,7 @@ func NewServer(logger klog.Logger, socketPath string, rh RegistrationHandler, ch
socketDir: dir,
rhandler: rh,
chandler: ch,
clients: make(map[string]Client),
clients: make(map[string][]Client),
}
return s, nil
@ -198,10 +198,12 @@ func (s *server) isVersionCompatibleWithPlugin(versions ...string) bool {
func (s *server) visitClients(visit func(r string, c Client)) {
s.mutex.Lock()
for r, c := range s.clients {
s.mutex.Unlock()
visit(r, c)
s.mutex.Lock()
for r, cs := range s.clients {
for _, c := range cs {
s.mutex.Unlock()
visit(r, c)
s.mutex.Lock()
}
}
s.mutex.Unlock()
}

View file

@ -0,0 +1,312 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
admissionapi "k8s.io/pod-security-admission/api"
)
// Serial because the test restarts Kubelet
var _ = SIGDescribe("Device Plugin Multiple", framework.WithSerial(), feature.DevicePlugin, func() {
f := framework.NewDefaultFramework("device-plugin-errors")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
testDevicePluginMultiple(f, kubeletdevicepluginv1beta1.DevicePluginPath)
})
func testDevicePluginMultiple(f *framework.Framework, pluginSockDir string) {
pluginSockDir = filepath.Clean(pluginSockDir) + "/"
f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() {
var devicePluginPod, devicePluginPod2 *v1.Pod
var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse
var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse
var err error
ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("Wait for node to be ready")
gomega.Eventually(ctx, func(ctx context.Context) bool {
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrueBecause("expected node to be ready"))
// Before we run the device plugin test, we need to ensure
// that the cluster is in a clean state and there are no
// pods running on this node.
// This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress.
// xref: https://issue.k8s.io/115381
gomega.Eventually(ctx, func(ctx context.Context) error {
v1alphaPodResources, err = getV1alpha1NodeDevices(ctx)
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %w", err)
}
v1PodResources, err = getV1NodeDevices(ctx)
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %w", err)
}
if len(v1alphaPodResources.PodResources) > 0 {
return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources)
}
if len(v1PodResources.PodResources) > 0 {
return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources)
}
return nil
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed())
ginkgo.By("Scheduling a sample device plugin pod")
dp := getSampleDevicePluginPod(pluginSockDir, "dp1")
devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp)
ginkgo.By("Waiting for devices to become available on the local node")
gomega.Eventually(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) > 0
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be available on local node"))
framework.Logf("Successfully created device plugin pod")
ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", e2enode.SampleDevsAmount))
gomega.Eventually(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready &&
e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount &&
e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected resource to be available on local node"))
})
ginkgo.AfterEach(func(ctx context.Context) {
ginkgo.By("Deleting the device plugin pods")
if devicePluginPod != nil {
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
}
if devicePluginPod2 != nil {
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
}
ginkgo.By("Deleting any Pods created by the test")
l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
for _, p := range l.Items {
if p.Namespace != f.Namespace.Name {
continue
}
framework.Logf("Deleting pod: %s", p.Name)
e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
}
restartKubelet(ctx, true)
ginkgo.By("Waiting for devices to become unavailable on the local node")
gomega.Eventually(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) <= 0
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrueBecause("expected devices to be unavailable on local node"))
ginkgo.By("devices now unavailable on the local node")
})
// Pod1 scheduled with DP1, DP2 appears, Pod2 scheduled successfully with both DPs running, DP1 is deleted,
// Pod3 is scheduled successfully after DP1 is removed.
ginkgo.It("Device Plugin Multiple: Basic rollout of device plugins with zero downtime", func(ctx context.Context) {
ginkgo.By("Scheduling Pod1 with DP1 successfully")
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
pod1, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Creating DP2")
dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2")
devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2)
ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged")
gomega.Consistently(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears"))
ginkgo.By("Verifying DP2 is running")
dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Scheduling Pod2 while both are running")
pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying Pod2 is running")
pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Deleting DP1")
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace)
ginkgo.By("Waiting for DP1 to unregister, and number of devices to remain unchanged")
gomega.Consistently(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices to decrease after DP1 disappears"))
ginkgo.By("Deleting Pod1 to free up resources")
e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
ginkgo.By("Scheduling Pod3 successfully after DP1 disappeared")
pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying Pod3 is running")
pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning))
})
// Pod1 scheduled with DP1, DP2 appears after 30 seconds, Pod2 scheduled successfully before DP2 connected, Pod3 scheduled successfully after DP2 connected.
ginkgo.It("Device Plugin Multiple: DP2 takes long time to start working", func(ctx context.Context) {
var err error
ginkgo.By("Scheduling Pod1 with DP1 successfully")
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Scheduling DP2 with a delay")
dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2")
dp2.Spec.Containers[0].Command = []string{"/bin/sh", "-c"}
dp2.Spec.Containers[0].Args = []string{"sleep 30 && exec /sampledeviceplugin -alsologtostderr"}
devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2)
ginkgo.By("Scheduling Pod2 successfully")
pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying Pod2 is running")
pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged")
gomega.Consistently(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears"))
ginkgo.By("Verifying DP2 is running")
dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Deleting Pod1 to free up resources")
e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
ginkgo.By("Scheduling Pod3 after DP2 registration succeeds")
pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying Pod3 is running")
pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning))
})
// Pod1 scheduled with DP1, DP2 appears, Pod2 scheduled successfully with both DPs running,
// DP2 is deleted, Pod3 is scheduled successfully after DP2 is removed.
ginkgo.It("Device Plugin Multiple: DP2 changed its mind", func(ctx context.Context) {
var err error
ginkgo.By("Scheduling Pod1 with DP1 successfully")
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod1.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Scheduling DP2")
dp2 := getSampleDevicePluginPodMultiple(pluginSockDir, "dp2")
devicePluginPod2 = e2epod.NewPodClient(f).CreateSync(ctx, dp2)
ginkgo.By("Waiting for DP2 to register for 30s, and number of devices to remain unchanged")
gomega.Consistently(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 appears"))
ginkgo.By("Verifying DP2 is running")
dp2Pod, err := e2epod.NewPodClient(f).Get(ctx, devicePluginPod2.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(dp2Pod.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Scheduling Pod2 while both are running")
pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying Pod2 is running")
pod2, err = e2epod.NewPodClient(f).Get(ctx, pod2.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod2.Status.Phase).To(gomega.Equal(v1.PodRunning))
ginkgo.By("Deleting DP2")
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod2.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
waitForContainerRemoval(ctx, devicePluginPod2.Spec.Containers[0].Name, devicePluginPod2.Name, devicePluginPod2.Namespace)
ginkgo.By("Waiting for DP2 to unregister, and number of devices to remain unchanged")
gomega.Consistently(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && e2enode.CountSampleDeviceCapacity(node) == e2enode.SampleDevsAmount && e2enode.CountSampleDeviceAllocatable(node) == e2enode.SampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrueBecause("expected devices after DP2 disappears"))
ginkgo.By("Deleting Pod1 to free up resources")
e2epod.NewPodClient(f).DeleteSync(ctx, pod1.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
ginkgo.By("Scheduling Pod3 successfully after DP2 disappeared")
pod3 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(e2enode.SampleDeviceResourceName, podRECMD))
ginkgo.By("Verifying Pod3 is running")
pod3, err = e2epod.NewPodClient(f).Get(ctx, pod3.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod3.Status.Phase).To(gomega.Equal(v1.PodRunning))
})
})
}
// This is a quick fix to allow duplicated device plugin otherwise, because CDI is hardcoded,
// we need another device plugin yaml.
// getSampleDevicePluginPodMultiple returns the Sample Device Plugin pod with CDI disabled.
func getSampleDevicePluginPodMultiple(pluginSockDir string, version string) *v1.Pod {
dp := getSampleDevicePluginPod(pluginSockDir, version)
for i := range dp.Spec.Containers[0].Env {
if dp.Spec.Containers[0].Env[i].Name == "CDI_ENABLED" {
dp.Spec.Containers[0].Env[i].Value = ""
}
}
return dp
}

View file

@ -170,7 +170,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed())
ginkgo.By("Scheduling a sample device plugin pod")
dp := getSampleDevicePluginPod(pluginSockDir)
dp := getSampleDevicePluginPod(pluginSockDir, "")
dptemplate = dp.DeepCopy()
devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp)
@ -1104,16 +1104,20 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta
}
// getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests.
func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod {
func getSampleDevicePluginPod(pluginSockDir string, version string) *v1.Pod {
data, err := e2etestfiles.Read(e2enode.SampleDevicePluginDSYAML)
if err != nil {
framework.Fail(err.Error())
}
ds := readDaemonSetV1OrDie(data)
podName := e2enode.SampleDevicePluginName
if version != "" {
podName = podName + "-" + version
}
dp := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: e2enode.SampleDevicePluginName,
Name: podName,
},
Spec: ds.Spec.Template.Spec,
}
@ -1124,6 +1128,9 @@ func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod {
}
dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "CDI_ENABLED", Value: "1"})
if version != "" {
dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "UNIQUE_NAME", Value: version})
}
return dp
}

View file

@ -2118,7 +2118,7 @@ func getOnlineCPUs() (cpuset.CPUSet, error) {
func setupSampleDevicePluginOrFail(ctx context.Context, f *framework.Framework) *v1.Pod {
e2enode.WaitForNodeToBeReady(ctx, f.ClientSet, framework.TestContext.NodeName, 5*time.Minute)
dp := getSampleDevicePluginPod(kubeletdevicepluginv1beta1.DevicePluginPath)
dp := getSampleDevicePluginPod(kubeletdevicepluginv1beta1.DevicePluginPath, "dp")
dp.Spec.NodeName = framework.TestContext.NodeName
ginkgo.By("Create the sample device plugin pod")