diff --git a/test/integration/.import-restrictions b/test/integration/.import-restrictions index 4d8b5843f88..e36690fff6e 100644 --- a/test/integration/.import-restrictions +++ b/test/integration/.import-restrictions @@ -12,6 +12,9 @@ rules: - selectorRegexp: k8s[.]io/kubernetes/test/e2e/framework/(pod|daemonset) allowedPrefixes: - "" + - selectorRegexp: k8s[.]io/kubernetes/test/e2e/dra/utils + allowedPrefixes: + - "" - selectorRegexp: k8s[.]io/kubernetes/test/e2e forbiddenPrefixes: - "" diff --git a/test/integration/dra/dra_test.go b/test/integration/dra/dra_test.go index c4e6aa872d3..e8ed25f1687 100644 --- a/test/integration/dra/dra_test.go +++ b/test/integration/dra/dra_test.go @@ -61,6 +61,7 @@ import ( "k8s.io/klog/v2" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/resourceclaim" resourceclaimmetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -172,6 +173,7 @@ func TestDRA(t *testing.T) { tCtx = tCtx.WithNamespace(namespace) TestCreateResourceSlices(tCtx, 100) }) + tCtx.Run("ShareResourceClaimSequentially", testShareResourceClaimSequentially) tCtx.Run("UsesAllResources", testUsesAllResources) }, }, @@ -249,6 +251,7 @@ func TestDRA(t *testing.T) { // in the experimental channel has an improvement that requires a higher number here than // in the incubating and stable channels. tCtx.Run("FilterTimeout", func(tCtx ktesting.TContext) { testFilterTimeout(tCtx, 20) }) + tCtx.Run("ShareResourceClaimSequentially", testShareResourceClaimSequentially) tCtx.Run("UsesAllResources", testUsesAllResources) }, }, @@ -288,6 +291,7 @@ func TestDRA(t *testing.T) { createNodes(tCtx) tCtx = prepareScheduler(tCtx) + tCtx = prepareClaimController(tCtx) tc.f(tCtx) }) @@ -381,6 +385,27 @@ func startSchedulerWithConfig(tCtx ktesting.TContext, config string) { scheduler.start(tCtx, config) } +// prepareClaimController does the same as prepareScheduler for the ResourceClaimController. +func prepareClaimController(tCtx ktesting.TContext) ktesting.TContext { + claimController := &claimControllerSingleton{ + rootCtx: tCtx, + } + + return tCtx.WithValue(claimControllerKey, claimController) +} + +// startClaimController can be used by tests to ensure that the ResourceClaim controller is running. +// This may be used in parallel tests. +func startClaimController(tCtx ktesting.TContext) { + tCtx.Helper() + value := tCtx.Value(claimControllerKey) + if value == nil { + tCtx.Fatal("internal error: startClaimController without a prior prepareClaimController call") + } + claimController := value.(*claimControllerSingleton) + claimController.start(tCtx) +} + type schedulerSingleton struct { rootCtx ktesting.TContext @@ -447,6 +472,84 @@ func newSchedulerComponentConfig(tCtx ktesting.TContext, cfgData string) *config return &cfg } +type claimControllerSingleton struct { + rootCtx ktesting.TContext + + mutex sync.Mutex + usageCount int + wg sync.WaitGroup + informerFactory informers.SharedInformerFactory + cancel func(cause string) +} + +func (claimController *claimControllerSingleton) start(tCtx ktesting.TContext) { + tCtx.Helper() + claimController.mutex.Lock() + defer claimController.mutex.Unlock() + + claimController.usageCount++ + tCtx.CleanupCtx(claimController.stop) + if claimController.usageCount > 1 { + // Already started earlier. + return + } + + // Run claimController with default configuration. This must use the root context because + // the per-test tCtx passed to start will get canceled once the test which triggered + // starting the claimController is done. + tCtx = claimController.rootCtx + tCtx.Logf("Starting the ResourceClaim controller for test %s...", tCtx.Name()) + tCtx = tCtx.WithLogger(klog.LoggerWithName(tCtx.Logger(), "claimController")) + + claimControllerCtx := tCtx.WithCancel() + claimController.cancel = claimControllerCtx.Cancel + + client := claimControllerCtx.Client() + claimController.informerFactory = informers.NewSharedInformerFactory(client, 0 /* resync period */) + controller, err := resourceclaim.NewController( + klog.FromContext(claimControllerCtx), + resourceclaim.Features{ + AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), + PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), + }, + claimControllerCtx.Client(), + claimController.informerFactory.Core().V1().Pods(), + claimController.informerFactory.Resource().V1().ResourceClaims(), + claimController.informerFactory.Resource().V1().ResourceClaimTemplates(), + ) + tCtx.ExpectNoError(err, "create ResourceClaim controller") + + claimController.informerFactory.Start(claimControllerCtx.Done()) + claimController.wg.Go(func() { + controller.Run(claimControllerCtx, 1 /* one worker to get more readable log output without interleaving */) + }) + tCtx.Logf("Started the claimController for test %s.", tCtx.Name()) +} + +func (claimController *claimControllerSingleton) stop(tCtx ktesting.TContext) { + claimController.mutex.Lock() + defer claimController.mutex.Unlock() + + claimController.usageCount-- + if claimController.usageCount > 0 { + // Still in use by some other test. + return + } + + claimController.rootCtx.Logf("Stopping the ResourceClaim controller after test %s...", tCtx.Name()) + if claimController.cancel != nil { + claimController.cancel("test is done") + } + if claimController.informerFactory != nil { + claimController.informerFactory.Shutdown() + } + claimController.wg.Wait() +} + +type claimControllerKeyType int + +var claimControllerKey claimControllerKeyType + // testPod creates a pod with a resource claim reference and then checks // whether that field is or isn't getting dropped. func testPod(tCtx ktesting.TContext, draEnabled bool) { diff --git a/test/integration/dra/resourceclaim_test.go b/test/integration/dra/resourceclaim_test.go new file mode 100644 index 00000000000..a1838189b73 --- /dev/null +++ b/test/integration/dra/resourceclaim_test.go @@ -0,0 +1,172 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "sync" + "time" + + "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + drautils "k8s.io/kubernetes/test/e2e/dra/utils" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" +) + +// testShareResourceClaim is the former "sharing a claim sequentially" test which was removed from E2E testing +// in https://github.com/kubernetes/kubernetes/pull/133214. It creates a single ResourceClaim and then schedules +// more pods using that ResourceClaim than supported at the same time. Scheduling so many pods in an E2E test +// was problematic, but in an integration test it's fine because we control the environment. +func testShareResourceClaimSequentially(tCtx ktesting.TContext) { + // Some E2E helpers still use Gomega directly. + // TODO: rewrite those helpers to use TContext. + // In the meantime, make Gomega work by running sequentially. + gomega.RegisterTestingT(tCtx) + tCtx.Cleanup(func() { + gomega.RegisterFailHandler(nil) + }) + + tCtx = tCtx.WithNamespace(createTestNamespace(tCtx, nil)) + startScheduler(tCtx) + startClaimController(tCtx) + + nodes := drautils.NewNodesNow(tCtx, 1, 8) + driver := drautils.NewDriverInstance(tCtx) + driver.WithRealNodes = false + driver.WithKubelet = false + driver.Run(tCtx, "/no/kubelet/root", nodes, drautils.NetworkResources(1, false)(nodes)) + b := drautils.NewBuilderNow(tCtx, driver) + + var objects []klog.KMetadata + objects = append(objects, b.ExternalClaim()) + + // This test used to test usage of the claim by one pod + // at a time. After removing the "not sharable" + // feature and bumping up the maximum number of + // consumers this is became a stress test. + numMaxPods := resourceapi.ResourceClaimReservedForMaxSize + tCtx.Logf("Creating %d pods sharing the same claim", numMaxPods) + pods := make([]*v1.Pod, numMaxPods) + for i := range numMaxPods { + pod := b.PodExternal() + pods[i] = pod + objects = append(objects, pod) + } + b.Create(tCtx, objects...) + + podStartTimeout := 5 * time.Minute * time.Duration(numMaxPods) + ensureDuration := time.Minute // Don't check for too long, even if it is less precise. + podIsScheduled := gomega.HaveField("Spec.NodeName", gomega.Not(gomega.BeEmpty())) + podIsPending := gomega.And( + gomega.HaveField("Status.Phase", gomega.Equal(v1.PodPending)), + gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Type": gomega.Equal(v1.PodScheduled), + "Status": gomega.Equal(v1.ConditionFalse), + // With the current scheduler code, creating too many pods is treated as an error, + // so this would have to be "SchedulerError". Maybe it shouldn't be an error? + // "Reason": gomega.Equal(v1.PodReasonUnschedulable), + }))), + ) + assertPodScheduledEventually := func(tCtx ktesting.TContext, pod *v1.Pod) { + tCtx.Helper() + tCtx.AssertEventually(tCtx.Client().CoreV1().Pods(pod.Namespace).Get). + WithArguments(pod.Name, metav1.GetOptions{}). + WithTimeout(podStartTimeout). + WithPolling(10*time.Second). + Should(podIsScheduled, "Pod %s should get scheduled.", pod.Name) + } + + assertPodPendingEventually := func(tCtx ktesting.TContext, pod *v1.Pod) { + tCtx.Helper() + tCtx.AssertEventually(tCtx.Client().CoreV1().Pods(pod.Namespace).Get). + WithArguments(pod.Name, metav1.GetOptions{}). + WithTimeout(ensureDuration). + WithPolling(10*time.Second). + Should(podIsPending, "Pod %s should remain pending.", pod.Name) + } + + // We don't know the order. All that matters is that all of them get scheduled eventually. + tCtx.Logf("Waiting for %d pods to be scheduled", numMaxPods) + var wg sync.WaitGroup + for i := range numMaxPods { + wg.Go(func() { + assertPodScheduledEventually(tCtx, pods[i]) + }) + } + wg.Wait() + if tCtx.Failed() { + return + } + + // TODO (?): check metrics about pod scheduling, speed up pod scheduling. + // Currently pods go into backoff, so the initial 256 pods get scheduled much + // more slowly that they could be. + + numMorePods := 10 + tCtx.Logf("Creating %d additional pods for the same claim", numMorePods) + morePods := make([]*v1.Pod, numMorePods) + objects = nil + for i := range numMorePods { + pod := b.PodExternal() + morePods[i] = pod + objects = append(objects, pod) + } + b.Create(tCtx, objects...) + + // None of the additional pods can run because of the ReservedFor limit. + tCtx.Logf("Check for %s that the additional pods don't get scheduled", ensureDuration) + for i := range numMorePods { + wg.Go(func() { + assertPodPendingEventually(tCtx, morePods[i]) + }) + } + wg.Wait() + if tCtx.Failed() { + return + } + + // We need to force-delete (no kubelet!) each scheduled pod, + // otherwise the new ones cannot use the claim. + tCtx.Logf("Deleting the initial %d pods", numMaxPods) + for i := range numMaxPods { + wg.Go(func() { + if !tCtx.AssertNoError(tCtx.Client().CoreV1().Pods(pods[i].Namespace).Delete(tCtx, pods[i].Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))})) { + return + } + // Should be almost (completely?) instantaneous. + tCtx.AssertNoError(e2epod.WaitForPodNotFoundInNamespace(tCtx, tCtx.Client(), pods[i].Name, pods[i].Namespace, time.Second)) + }) + } + wg.Wait() + if tCtx.Failed() { + return + } + + // Now those should also get scheduled - eventually... + tCtx.Logf("Waiting for the additional %d pods to be scheduled", numMorePods) + for i := range numMorePods { + wg.Go(func() { + assertPodScheduledEventually(tCtx, morePods[i]) + }) + } + wg.Wait() +}