From 0cb57b97e7c2ab08070b26d0e3681046fa2e0f5c Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 18 Dec 2025 11:20:21 +0100 Subject: [PATCH] DRA integration: test sharing a claim sequentially This used to be an E2E test, but it turned out to be too slow and unreliable and therefore got removed. As an integration test we have a bit better control over the environment, so it should be possible to avoid the same flakes. Some of the slowness comes from pods entering backoff. Maybe this is an opportunity for future improvements. To support this tests, the ResourceClaim controller is needed. The framework can start it on demand now, similar to how the scheduler was handled already. --- test/integration/.import-restrictions | 3 + test/integration/dra/dra_test.go | 103 ++++++++++++ test/integration/dra/resourceclaim_test.go | 172 +++++++++++++++++++++ 3 files changed, 278 insertions(+) create mode 100644 test/integration/dra/resourceclaim_test.go diff --git a/test/integration/.import-restrictions b/test/integration/.import-restrictions index 4d8b5843f88..e36690fff6e 100644 --- a/test/integration/.import-restrictions +++ b/test/integration/.import-restrictions @@ -12,6 +12,9 @@ rules: - selectorRegexp: k8s[.]io/kubernetes/test/e2e/framework/(pod|daemonset) allowedPrefixes: - "" + - selectorRegexp: k8s[.]io/kubernetes/test/e2e/dra/utils + allowedPrefixes: + - "" - selectorRegexp: k8s[.]io/kubernetes/test/e2e forbiddenPrefixes: - "" diff --git a/test/integration/dra/dra_test.go b/test/integration/dra/dra_test.go index c4e6aa872d3..e8ed25f1687 100644 --- a/test/integration/dra/dra_test.go +++ b/test/integration/dra/dra_test.go @@ -61,6 +61,7 @@ import ( "k8s.io/klog/v2" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/resourceclaim" resourceclaimmetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -172,6 +173,7 @@ func TestDRA(t *testing.T) { tCtx = tCtx.WithNamespace(namespace) TestCreateResourceSlices(tCtx, 100) }) + tCtx.Run("ShareResourceClaimSequentially", testShareResourceClaimSequentially) tCtx.Run("UsesAllResources", testUsesAllResources) }, }, @@ -249,6 +251,7 @@ func TestDRA(t *testing.T) { // in the experimental channel has an improvement that requires a higher number here than // in the incubating and stable channels. tCtx.Run("FilterTimeout", func(tCtx ktesting.TContext) { testFilterTimeout(tCtx, 20) }) + tCtx.Run("ShareResourceClaimSequentially", testShareResourceClaimSequentially) tCtx.Run("UsesAllResources", testUsesAllResources) }, }, @@ -288,6 +291,7 @@ func TestDRA(t *testing.T) { createNodes(tCtx) tCtx = prepareScheduler(tCtx) + tCtx = prepareClaimController(tCtx) tc.f(tCtx) }) @@ -381,6 +385,27 @@ func startSchedulerWithConfig(tCtx ktesting.TContext, config string) { scheduler.start(tCtx, config) } +// prepareClaimController does the same as prepareScheduler for the ResourceClaimController. +func prepareClaimController(tCtx ktesting.TContext) ktesting.TContext { + claimController := &claimControllerSingleton{ + rootCtx: tCtx, + } + + return tCtx.WithValue(claimControllerKey, claimController) +} + +// startClaimController can be used by tests to ensure that the ResourceClaim controller is running. +// This may be used in parallel tests. +func startClaimController(tCtx ktesting.TContext) { + tCtx.Helper() + value := tCtx.Value(claimControllerKey) + if value == nil { + tCtx.Fatal("internal error: startClaimController without a prior prepareClaimController call") + } + claimController := value.(*claimControllerSingleton) + claimController.start(tCtx) +} + type schedulerSingleton struct { rootCtx ktesting.TContext @@ -447,6 +472,84 @@ func newSchedulerComponentConfig(tCtx ktesting.TContext, cfgData string) *config return &cfg } +type claimControllerSingleton struct { + rootCtx ktesting.TContext + + mutex sync.Mutex + usageCount int + wg sync.WaitGroup + informerFactory informers.SharedInformerFactory + cancel func(cause string) +} + +func (claimController *claimControllerSingleton) start(tCtx ktesting.TContext) { + tCtx.Helper() + claimController.mutex.Lock() + defer claimController.mutex.Unlock() + + claimController.usageCount++ + tCtx.CleanupCtx(claimController.stop) + if claimController.usageCount > 1 { + // Already started earlier. + return + } + + // Run claimController with default configuration. This must use the root context because + // the per-test tCtx passed to start will get canceled once the test which triggered + // starting the claimController is done. + tCtx = claimController.rootCtx + tCtx.Logf("Starting the ResourceClaim controller for test %s...", tCtx.Name()) + tCtx = tCtx.WithLogger(klog.LoggerWithName(tCtx.Logger(), "claimController")) + + claimControllerCtx := tCtx.WithCancel() + claimController.cancel = claimControllerCtx.Cancel + + client := claimControllerCtx.Client() + claimController.informerFactory = informers.NewSharedInformerFactory(client, 0 /* resync period */) + controller, err := resourceclaim.NewController( + klog.FromContext(claimControllerCtx), + resourceclaim.Features{ + AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), + PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), + }, + claimControllerCtx.Client(), + claimController.informerFactory.Core().V1().Pods(), + claimController.informerFactory.Resource().V1().ResourceClaims(), + claimController.informerFactory.Resource().V1().ResourceClaimTemplates(), + ) + tCtx.ExpectNoError(err, "create ResourceClaim controller") + + claimController.informerFactory.Start(claimControllerCtx.Done()) + claimController.wg.Go(func() { + controller.Run(claimControllerCtx, 1 /* one worker to get more readable log output without interleaving */) + }) + tCtx.Logf("Started the claimController for test %s.", tCtx.Name()) +} + +func (claimController *claimControllerSingleton) stop(tCtx ktesting.TContext) { + claimController.mutex.Lock() + defer claimController.mutex.Unlock() + + claimController.usageCount-- + if claimController.usageCount > 0 { + // Still in use by some other test. + return + } + + claimController.rootCtx.Logf("Stopping the ResourceClaim controller after test %s...", tCtx.Name()) + if claimController.cancel != nil { + claimController.cancel("test is done") + } + if claimController.informerFactory != nil { + claimController.informerFactory.Shutdown() + } + claimController.wg.Wait() +} + +type claimControllerKeyType int + +var claimControllerKey claimControllerKeyType + // testPod creates a pod with a resource claim reference and then checks // whether that field is or isn't getting dropped. func testPod(tCtx ktesting.TContext, draEnabled bool) { diff --git a/test/integration/dra/resourceclaim_test.go b/test/integration/dra/resourceclaim_test.go new file mode 100644 index 00000000000..a1838189b73 --- /dev/null +++ b/test/integration/dra/resourceclaim_test.go @@ -0,0 +1,172 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "sync" + "time" + + "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + drautils "k8s.io/kubernetes/test/e2e/dra/utils" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" +) + +// testShareResourceClaim is the former "sharing a claim sequentially" test which was removed from E2E testing +// in https://github.com/kubernetes/kubernetes/pull/133214. It creates a single ResourceClaim and then schedules +// more pods using that ResourceClaim than supported at the same time. Scheduling so many pods in an E2E test +// was problematic, but in an integration test it's fine because we control the environment. +func testShareResourceClaimSequentially(tCtx ktesting.TContext) { + // Some E2E helpers still use Gomega directly. + // TODO: rewrite those helpers to use TContext. + // In the meantime, make Gomega work by running sequentially. + gomega.RegisterTestingT(tCtx) + tCtx.Cleanup(func() { + gomega.RegisterFailHandler(nil) + }) + + tCtx = tCtx.WithNamespace(createTestNamespace(tCtx, nil)) + startScheduler(tCtx) + startClaimController(tCtx) + + nodes := drautils.NewNodesNow(tCtx, 1, 8) + driver := drautils.NewDriverInstance(tCtx) + driver.WithRealNodes = false + driver.WithKubelet = false + driver.Run(tCtx, "/no/kubelet/root", nodes, drautils.NetworkResources(1, false)(nodes)) + b := drautils.NewBuilderNow(tCtx, driver) + + var objects []klog.KMetadata + objects = append(objects, b.ExternalClaim()) + + // This test used to test usage of the claim by one pod + // at a time. After removing the "not sharable" + // feature and bumping up the maximum number of + // consumers this is became a stress test. + numMaxPods := resourceapi.ResourceClaimReservedForMaxSize + tCtx.Logf("Creating %d pods sharing the same claim", numMaxPods) + pods := make([]*v1.Pod, numMaxPods) + for i := range numMaxPods { + pod := b.PodExternal() + pods[i] = pod + objects = append(objects, pod) + } + b.Create(tCtx, objects...) + + podStartTimeout := 5 * time.Minute * time.Duration(numMaxPods) + ensureDuration := time.Minute // Don't check for too long, even if it is less precise. + podIsScheduled := gomega.HaveField("Spec.NodeName", gomega.Not(gomega.BeEmpty())) + podIsPending := gomega.And( + gomega.HaveField("Status.Phase", gomega.Equal(v1.PodPending)), + gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Type": gomega.Equal(v1.PodScheduled), + "Status": gomega.Equal(v1.ConditionFalse), + // With the current scheduler code, creating too many pods is treated as an error, + // so this would have to be "SchedulerError". Maybe it shouldn't be an error? + // "Reason": gomega.Equal(v1.PodReasonUnschedulable), + }))), + ) + assertPodScheduledEventually := func(tCtx ktesting.TContext, pod *v1.Pod) { + tCtx.Helper() + tCtx.AssertEventually(tCtx.Client().CoreV1().Pods(pod.Namespace).Get). + WithArguments(pod.Name, metav1.GetOptions{}). + WithTimeout(podStartTimeout). + WithPolling(10*time.Second). + Should(podIsScheduled, "Pod %s should get scheduled.", pod.Name) + } + + assertPodPendingEventually := func(tCtx ktesting.TContext, pod *v1.Pod) { + tCtx.Helper() + tCtx.AssertEventually(tCtx.Client().CoreV1().Pods(pod.Namespace).Get). + WithArguments(pod.Name, metav1.GetOptions{}). + WithTimeout(ensureDuration). + WithPolling(10*time.Second). + Should(podIsPending, "Pod %s should remain pending.", pod.Name) + } + + // We don't know the order. All that matters is that all of them get scheduled eventually. + tCtx.Logf("Waiting for %d pods to be scheduled", numMaxPods) + var wg sync.WaitGroup + for i := range numMaxPods { + wg.Go(func() { + assertPodScheduledEventually(tCtx, pods[i]) + }) + } + wg.Wait() + if tCtx.Failed() { + return + } + + // TODO (?): check metrics about pod scheduling, speed up pod scheduling. + // Currently pods go into backoff, so the initial 256 pods get scheduled much + // more slowly that they could be. + + numMorePods := 10 + tCtx.Logf("Creating %d additional pods for the same claim", numMorePods) + morePods := make([]*v1.Pod, numMorePods) + objects = nil + for i := range numMorePods { + pod := b.PodExternal() + morePods[i] = pod + objects = append(objects, pod) + } + b.Create(tCtx, objects...) + + // None of the additional pods can run because of the ReservedFor limit. + tCtx.Logf("Check for %s that the additional pods don't get scheduled", ensureDuration) + for i := range numMorePods { + wg.Go(func() { + assertPodPendingEventually(tCtx, morePods[i]) + }) + } + wg.Wait() + if tCtx.Failed() { + return + } + + // We need to force-delete (no kubelet!) each scheduled pod, + // otherwise the new ones cannot use the claim. + tCtx.Logf("Deleting the initial %d pods", numMaxPods) + for i := range numMaxPods { + wg.Go(func() { + if !tCtx.AssertNoError(tCtx.Client().CoreV1().Pods(pods[i].Namespace).Delete(tCtx, pods[i].Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))})) { + return + } + // Should be almost (completely?) instantaneous. + tCtx.AssertNoError(e2epod.WaitForPodNotFoundInNamespace(tCtx, tCtx.Client(), pods[i].Name, pods[i].Namespace, time.Second)) + }) + } + wg.Wait() + if tCtx.Failed() { + return + } + + // Now those should also get scheduled - eventually... + tCtx.Logf("Waiting for the additional %d pods to be scheduled", numMorePods) + for i := range numMorePods { + wg.Go(func() { + assertPodScheduledEventually(tCtx, morePods[i]) + }) + } + wg.Wait() +}