DRA integration: test sharing a claim sequentially

This used to be an E2E test, but it turned out to be too slow and unreliable
and therefore got removed. As an integration test we have a bit better control
over the environment, so it should be possible to avoid the same flakes.

Some of the slowness comes from pods entering backoff. Maybe this is an
opportunity for future improvements.

To support this tests, the ResourceClaim controller is needed. The framework
can start it on demand now, similar to how the scheduler was handled already.
This commit is contained in:
Patrick Ohly 2025-12-18 11:20:21 +01:00
parent bff684d951
commit 0cb57b97e7
3 changed files with 278 additions and 0 deletions

View file

@ -12,6 +12,9 @@ rules:
- selectorRegexp: k8s[.]io/kubernetes/test/e2e/framework/(pod|daemonset)
allowedPrefixes:
- ""
- selectorRegexp: k8s[.]io/kubernetes/test/e2e/dra/utils
allowedPrefixes:
- ""
- selectorRegexp: k8s[.]io/kubernetes/test/e2e
forbiddenPrefixes:
- ""

View file

@ -61,6 +61,7 @@ import (
"k8s.io/klog/v2"
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/resourceclaim"
resourceclaimmetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -172,6 +173,7 @@ func TestDRA(t *testing.T) {
tCtx = tCtx.WithNamespace(namespace)
TestCreateResourceSlices(tCtx, 100)
})
tCtx.Run("ShareResourceClaimSequentially", testShareResourceClaimSequentially)
tCtx.Run("UsesAllResources", testUsesAllResources)
},
},
@ -249,6 +251,7 @@ func TestDRA(t *testing.T) {
// in the experimental channel has an improvement that requires a higher number here than
// in the incubating and stable channels.
tCtx.Run("FilterTimeout", func(tCtx ktesting.TContext) { testFilterTimeout(tCtx, 20) })
tCtx.Run("ShareResourceClaimSequentially", testShareResourceClaimSequentially)
tCtx.Run("UsesAllResources", testUsesAllResources)
},
},
@ -288,6 +291,7 @@ func TestDRA(t *testing.T) {
createNodes(tCtx)
tCtx = prepareScheduler(tCtx)
tCtx = prepareClaimController(tCtx)
tc.f(tCtx)
})
@ -381,6 +385,27 @@ func startSchedulerWithConfig(tCtx ktesting.TContext, config string) {
scheduler.start(tCtx, config)
}
// prepareClaimController does the same as prepareScheduler for the ResourceClaimController.
func prepareClaimController(tCtx ktesting.TContext) ktesting.TContext {
claimController := &claimControllerSingleton{
rootCtx: tCtx,
}
return tCtx.WithValue(claimControllerKey, claimController)
}
// startClaimController can be used by tests to ensure that the ResourceClaim controller is running.
// This may be used in parallel tests.
func startClaimController(tCtx ktesting.TContext) {
tCtx.Helper()
value := tCtx.Value(claimControllerKey)
if value == nil {
tCtx.Fatal("internal error: startClaimController without a prior prepareClaimController call")
}
claimController := value.(*claimControllerSingleton)
claimController.start(tCtx)
}
type schedulerSingleton struct {
rootCtx ktesting.TContext
@ -447,6 +472,84 @@ func newSchedulerComponentConfig(tCtx ktesting.TContext, cfgData string) *config
return &cfg
}
type claimControllerSingleton struct {
rootCtx ktesting.TContext
mutex sync.Mutex
usageCount int
wg sync.WaitGroup
informerFactory informers.SharedInformerFactory
cancel func(cause string)
}
func (claimController *claimControllerSingleton) start(tCtx ktesting.TContext) {
tCtx.Helper()
claimController.mutex.Lock()
defer claimController.mutex.Unlock()
claimController.usageCount++
tCtx.CleanupCtx(claimController.stop)
if claimController.usageCount > 1 {
// Already started earlier.
return
}
// Run claimController with default configuration. This must use the root context because
// the per-test tCtx passed to start will get canceled once the test which triggered
// starting the claimController is done.
tCtx = claimController.rootCtx
tCtx.Logf("Starting the ResourceClaim controller for test %s...", tCtx.Name())
tCtx = tCtx.WithLogger(klog.LoggerWithName(tCtx.Logger(), "claimController"))
claimControllerCtx := tCtx.WithCancel()
claimController.cancel = claimControllerCtx.Cancel
client := claimControllerCtx.Client()
claimController.informerFactory = informers.NewSharedInformerFactory(client, 0 /* resync period */)
controller, err := resourceclaim.NewController(
klog.FromContext(claimControllerCtx),
resourceclaim.Features{
AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
},
claimControllerCtx.Client(),
claimController.informerFactory.Core().V1().Pods(),
claimController.informerFactory.Resource().V1().ResourceClaims(),
claimController.informerFactory.Resource().V1().ResourceClaimTemplates(),
)
tCtx.ExpectNoError(err, "create ResourceClaim controller")
claimController.informerFactory.Start(claimControllerCtx.Done())
claimController.wg.Go(func() {
controller.Run(claimControllerCtx, 1 /* one worker to get more readable log output without interleaving */)
})
tCtx.Logf("Started the claimController for test %s.", tCtx.Name())
}
func (claimController *claimControllerSingleton) stop(tCtx ktesting.TContext) {
claimController.mutex.Lock()
defer claimController.mutex.Unlock()
claimController.usageCount--
if claimController.usageCount > 0 {
// Still in use by some other test.
return
}
claimController.rootCtx.Logf("Stopping the ResourceClaim controller after test %s...", tCtx.Name())
if claimController.cancel != nil {
claimController.cancel("test is done")
}
if claimController.informerFactory != nil {
claimController.informerFactory.Shutdown()
}
claimController.wg.Wait()
}
type claimControllerKeyType int
var claimControllerKey claimControllerKeyType
// testPod creates a pod with a resource claim reference and then checks
// whether that field is or isn't getting dropped.
func testPod(tCtx ktesting.TContext, draEnabled bool) {

View file

@ -0,0 +1,172 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
"sync"
"time"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gstruct"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
drautils "k8s.io/kubernetes/test/e2e/dra/utils"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
// testShareResourceClaim is the former "sharing a claim sequentially" test which was removed from E2E testing
// in https://github.com/kubernetes/kubernetes/pull/133214. It creates a single ResourceClaim and then schedules
// more pods using that ResourceClaim than supported at the same time. Scheduling so many pods in an E2E test
// was problematic, but in an integration test it's fine because we control the environment.
func testShareResourceClaimSequentially(tCtx ktesting.TContext) {
// Some E2E helpers still use Gomega directly.
// TODO: rewrite those helpers to use TContext.
// In the meantime, make Gomega work by running sequentially.
gomega.RegisterTestingT(tCtx)
tCtx.Cleanup(func() {
gomega.RegisterFailHandler(nil)
})
tCtx = tCtx.WithNamespace(createTestNamespace(tCtx, nil))
startScheduler(tCtx)
startClaimController(tCtx)
nodes := drautils.NewNodesNow(tCtx, 1, 8)
driver := drautils.NewDriverInstance(tCtx)
driver.WithRealNodes = false
driver.WithKubelet = false
driver.Run(tCtx, "/no/kubelet/root", nodes, drautils.NetworkResources(1, false)(nodes))
b := drautils.NewBuilderNow(tCtx, driver)
var objects []klog.KMetadata
objects = append(objects, b.ExternalClaim())
// This test used to test usage of the claim by one pod
// at a time. After removing the "not sharable"
// feature and bumping up the maximum number of
// consumers this is became a stress test.
numMaxPods := resourceapi.ResourceClaimReservedForMaxSize
tCtx.Logf("Creating %d pods sharing the same claim", numMaxPods)
pods := make([]*v1.Pod, numMaxPods)
for i := range numMaxPods {
pod := b.PodExternal()
pods[i] = pod
objects = append(objects, pod)
}
b.Create(tCtx, objects...)
podStartTimeout := 5 * time.Minute * time.Duration(numMaxPods)
ensureDuration := time.Minute // Don't check for too long, even if it is less precise.
podIsScheduled := gomega.HaveField("Spec.NodeName", gomega.Not(gomega.BeEmpty()))
podIsPending := gomega.And(
gomega.HaveField("Status.Phase", gomega.Equal(v1.PodPending)),
gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{
"Type": gomega.Equal(v1.PodScheduled),
"Status": gomega.Equal(v1.ConditionFalse),
// With the current scheduler code, creating too many pods is treated as an error,
// so this would have to be "SchedulerError". Maybe it shouldn't be an error?
// "Reason": gomega.Equal(v1.PodReasonUnschedulable),
}))),
)
assertPodScheduledEventually := func(tCtx ktesting.TContext, pod *v1.Pod) {
tCtx.Helper()
tCtx.AssertEventually(tCtx.Client().CoreV1().Pods(pod.Namespace).Get).
WithArguments(pod.Name, metav1.GetOptions{}).
WithTimeout(podStartTimeout).
WithPolling(10*time.Second).
Should(podIsScheduled, "Pod %s should get scheduled.", pod.Name)
}
assertPodPendingEventually := func(tCtx ktesting.TContext, pod *v1.Pod) {
tCtx.Helper()
tCtx.AssertEventually(tCtx.Client().CoreV1().Pods(pod.Namespace).Get).
WithArguments(pod.Name, metav1.GetOptions{}).
WithTimeout(ensureDuration).
WithPolling(10*time.Second).
Should(podIsPending, "Pod %s should remain pending.", pod.Name)
}
// We don't know the order. All that matters is that all of them get scheduled eventually.
tCtx.Logf("Waiting for %d pods to be scheduled", numMaxPods)
var wg sync.WaitGroup
for i := range numMaxPods {
wg.Go(func() {
assertPodScheduledEventually(tCtx, pods[i])
})
}
wg.Wait()
if tCtx.Failed() {
return
}
// TODO (?): check metrics about pod scheduling, speed up pod scheduling.
// Currently pods go into backoff, so the initial 256 pods get scheduled much
// more slowly that they could be.
numMorePods := 10
tCtx.Logf("Creating %d additional pods for the same claim", numMorePods)
morePods := make([]*v1.Pod, numMorePods)
objects = nil
for i := range numMorePods {
pod := b.PodExternal()
morePods[i] = pod
objects = append(objects, pod)
}
b.Create(tCtx, objects...)
// None of the additional pods can run because of the ReservedFor limit.
tCtx.Logf("Check for %s that the additional pods don't get scheduled", ensureDuration)
for i := range numMorePods {
wg.Go(func() {
assertPodPendingEventually(tCtx, morePods[i])
})
}
wg.Wait()
if tCtx.Failed() {
return
}
// We need to force-delete (no kubelet!) each scheduled pod,
// otherwise the new ones cannot use the claim.
tCtx.Logf("Deleting the initial %d pods", numMaxPods)
for i := range numMaxPods {
wg.Go(func() {
if !tCtx.AssertNoError(tCtx.Client().CoreV1().Pods(pods[i].Namespace).Delete(tCtx, pods[i].Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))})) {
return
}
// Should be almost (completely?) instantaneous.
tCtx.AssertNoError(e2epod.WaitForPodNotFoundInNamespace(tCtx, tCtx.Client(), pods[i].Name, pods[i].Namespace, time.Second))
})
}
wg.Wait()
if tCtx.Failed() {
return
}
// Now those should also get scheduled - eventually...
tCtx.Logf("Waiting for the additional %d pods to be scheduled", numMorePods)
for i := range numMorePods {
wg.Go(func() {
assertPodScheduledEventually(tCtx, morePods[i])
})
}
wg.Wait()
}