From 703642baf653b14f1a614fec37d1a27c98028f19 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Jan 2026 08:39:02 +0100 Subject: [PATCH 1/4] DRA scheduler: fix one root cause of double device allocation DRA depends on the assume cache having invoked all event handlers before Assume() returns, because DRA maintains state that is relevant for scheduling through those event handlers. This log snippet shows how this went wrong during PreBind: dynamicresources.go:1150: I0115 10:35:29.264437] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" dra_manager.go:198: I0115 10:35:29.264448] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 version="287" dynamicresources.go:1157: I0115 10:35:29.264463] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" allocation=< ... allocateddevices.go:189: I0115 10:35:29.267315] scheduler: Observed device allocation device="testdra-all-usesallresources-kqjpj.driver/worker-1/worker-1-device-096" claim="testdra-all-usesallresources-kqjpj/claim-0091" - goroutine #1: UpdateStatus result delivered via informer. AssumeCache updates cache, pushes event A, emitEvents pulls event A from queue. *Not* done with delivering it yet! - goroutine #2: AssumeCache.Assume called. Updates cache, pushes event B, emits it. Old and new claim have allocation, so no "Observed device allocation". - goroutine #3: Schedules next pod, without considering device as allocated (not in the log snippet). - goroutine #1: Finally delivers event A: "Observed device allocation", but too late. Also, events are delivered out-of-order. The fix is to let emitEvents when called by Assume wait for a potentially running emitEvents in some other goroutine, thus ensuring that an event pulled out of the queue by that other goroutine got delivered before Assume itself checks the queue one more time and then returns. The time window were things go wrong is small. An E2E test covering this only flaked rarely, and only in the CI. An integration test (separate commit) with higher number of pods finally made it possible to reproduce locally. It also uncovered a second race (fix in separate commit). The unit test fails without the fix: === RUN TestAssumeConcurrency assume_cache_test.go:311: FATAL ERROR: Assume should have blocked and didn't. --- FAIL: TestAssumeConcurrency (0.00s) --- .../util/assumecache/assume_cache.go | 32 ++++++++++- .../util/assumecache/assume_cache_test.go | 57 +++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 889cb9efe3b..fae3230bc3a 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -124,6 +124,9 @@ type AssumeCache struct { // Synchronizes updates to all fields below. rwMutex sync.RWMutex + // cond is used by emitEvents. + cond *sync.Cond + // All registered event handlers. eventHandlers []cache.ResourceEventHandler handlerRegistration cache.ResourceEventHandlerRegistration @@ -149,6 +152,9 @@ type AssumeCache struct { // of events would no longer be guaranteed. eventQueue buffer.Ring[func()] + // emittingEvents is true while one emitEvents call is actively emitting events. + emittingEvents bool + // describes the object stored description string @@ -196,6 +202,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam indexName: indexName, eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}), } + c.cond = sync.NewCond(&c.rwMutex) indexers := cache.Indexers{} if indexName != "" && indexFunc != nil { indexers[indexName] = c.objInfoIndexFunc @@ -508,8 +515,31 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache. } // emitEvents delivers all pending events that are in the queue, in the order -// in which they were stored there (FIFO). +// in which they were stored there (FIFO). Only one goroutine at a time is +// delivering events, to ensure correct order. func (c *AssumeCache) emitEvents() { + c.rwMutex.Lock() + for c.emittingEvents { + // Wait for the active caller of emitEvents to finish. + // When it is done, it may or may not have drained + // the events pushed by our caller. + // We'll check below ourselves. + c.cond.Wait() + } + c.emittingEvents = true + c.rwMutex.Unlock() + + defer func() { + c.rwMutex.Lock() + c.emittingEvents = false + // Hand over the batton to one other goroutine, if there is one. + // We don't need to wake up more than one because only one of + // them would be able to grab the "emittingEvents" responsibility. + c.cond.Signal() + c.rwMutex.Unlock() + }() + + // When we get here, this instance of emitEvents is the active one. for { c.rwMutex.Lock() deliver, ok := c.eventQueue.ReadOne() diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index a18d51e0a87..dfda14f5304 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -265,6 +265,63 @@ func TestAssume(t *testing.T) { } } +// TestAssumeRace simulates this sequence of events: +// - Informer update arrives, event handler gets invoked and is slow. +// - Assume for the same object is called. It must block until +// the informer-triggered event is delivered. +func TestAssumeRace(t *testing.T) { ktesting.Init(t).SyncTest("", testAssumeRace) } +func testAssumeRace(tCtx ktesting.TContext) { + var informer testInformer + testObj := makeObj("pvc1", "1", "") + ac := NewAssumeCache(tCtx.Logger(), &informer, "TestObject", "", nil) + blockEvent := tCtx.WithCancel() + defer blockEvent.Cancel("test done") + eventBlocked := false + eventDone := false + ac.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + eventBlocked = true + <-blockEvent.Done() + eventDone = true + }, + }) + + // Here a real client with to a Create. What the assume cache may or may not + // see before Assume is the new object from the informer - let's pretend that + // comes first. + go informer.add(testObj) + + // Wait for processing to finish. + tCtx.Wait() + if !eventBlocked { + tCtx.Fatal("Event handler should have been called and wasn't.") + } + + // Assume should block until we unblock the event delivery. + assumeDone := false + go func() { + err := ac.Assume(testObj) + tCtx.AssertNoError(err, "Assume failed") + assumeDone = true + }() + + // Wait for Assume to be blocked in its implementation. + tCtx.Wait() + if assumeDone { + tCtx.Fatal("Assume should have blocked and didn't.") + } + + // Unblock both goroutines. + blockEvent.Cancel("proceed") + tCtx.Wait() + if !eventDone { + tCtx.Fatal("Event should have been delivered and wasn't.") + } + if !assumeDone { + tCtx.Fatal("Assume should have returned and didn't.") + } +} + func TestRestore(t *testing.T) { tCtx, cache, informer := newTest(t) var events mockEventHandler From 581ee0a2eca3eb7cbeeb49099c784c2555136b5e Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Jan 2026 07:51:14 +0100 Subject: [PATCH 2/4] DRA scheduler: fix another root cause of double device allocation GatherAllocatedState and ListAllAllocatedDevices need to collect information from different sources (allocated devices, in-flight claims), potentially even multiple times (GatherAllocatedState first gets allocated devices, then the capacities). The underlying assumption that nothing bad happens in parallel is not always true. The following log snippet shows how an update of the assume cache (feeding the allocated devices tracker) and in-flight claims lands such that GatherAllocatedState doesn't see the device in that claim as allocated: dra_manager.go:263: I0115 15:11:04.407714 18778] scheduler: Starting GatherAllocatedState ... allocateddevices.go:189: I0115 15:11:04.407945 18066] scheduler: Observed device allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-094" claim="testdra-all-usesallresources-hvs5d/claim-0553" dynamicresources.go:1150: I0115 15:11:04.407981 89109] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" dra_manager.go:201: I0115 15:11:04.408008 89109] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 version="1211" dynamicresources.go:1157: I0115 15:11:04.408044 89109] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" allocation=< { "devices": { "results": [ { "request": "req-1", "driver": "testdra-all-usesallresources-hvs5d.driver", "pool": "worker-5", "device": "worker-5-device-094" } ] }, "nodeSelector": { "nodeSelectorTerms": [ { "matchFields": [ { "key": "metadata.name", "operator": "In", "values": [ "worker-5" ] } ] } ] }, "allocationTimestamp": "2026-01-15T14:11:04Z" } > dra_manager.go:280: I0115 15:11:04.408085 18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-095" claim="testdra-all-usesallresources-hvs5d/claim-0086" dra_manager.go:280: I0115 15:11:04.408137 18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-096" claim="testdra-all-usesallresources-hvs5d/claim-0165" default_binder.go:69: I0115 15:11:04.408175 89109] scheduler: Attempting to bind pod to node pod="testdra-all-usesallresources-hvs5d/my-pod-0553" node="worker-5" dra_manager.go:265: I0115 15:11:04.408264 18778] scheduler: Finished GatherAllocatedState allocatedDevices=: { Initial state: "worker-5-device-094" is in-flight, not in cache - goroutine #1: starts GatherAllocatedState, copies cache - goroutine #2: adds to assume cache, removes from in-flight - goroutine #1: checks in-flight => device never seen as allocated This is the second reason for double allocation of the same device in two different claims. The other was timing in the assume cache. Both were tracked down with an integration test (separate commit). It did not fail all the time, but enough that regressions should show up as flakes. --- .../dynamicresources/allocateddevices.go | 33 +++++++++-- .../plugins/dynamicresources/dra_manager.go | 56 +++++++++++++++---- .../dynamicresources/dynamicresources.go | 54 ++++++++++++------ 3 files changed, 111 insertions(+), 32 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go index 24d6f8c97c3..21d192ffc89 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -85,11 +85,18 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, // This is cheaper than repeatedly calling List, making strings unique, and building the set // each time PreFilter is called. // +// To simplify detecting concurrent changes, each modification bumps a revision counter, +// similar to ResourceVersion in the apiserver. Get and Capacities include the +// current value in their result. A caller than can compare againt the current value +// to determine whether some prior results are still up-to-date, without having to get +// and compare them. +// // All methods are thread-safe. Get returns a cloned set. type allocatedDevices struct { logger klog.Logger mutex sync.RWMutex + revision int64 ids sets.Set[structured.DeviceID] shareIDs sets.Set[structured.SharedDeviceID] capacities structured.ConsumedCapacityCollection @@ -106,18 +113,25 @@ func newAllocatedDevices(logger klog.Logger) *allocatedDevices { } } -func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { +func (a *allocatedDevices) Get() (sets.Set[structured.DeviceID], int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.ids.Clone() + return a.ids.Clone(), a.revision } -func (a *allocatedDevices) Capacities() structured.ConsumedCapacityCollection { +func (a *allocatedDevices) Capacities() (structured.ConsumedCapacityCollection, int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.capacities.Clone() + return a.capacities.Clone(), a.revision +} + +func (a *allocatedDevices) Revision() int64 { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.revision } func (a *allocatedDevices) handlers() cache.ResourceEventHandler { @@ -200,8 +214,13 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { }, ) + if len(deviceIDs) == 0 && len(shareIDs) == 0 && len(deviceCapacities) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Insert(deviceID) } @@ -241,8 +260,14 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { a.logger.V(6).Info("Observed consumed capacity release", "device id", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim)) deviceCapacities = append(deviceCapacities, capacity) }) + + if len(deviceIDs) == 0 && len(shareIDs) == 0 && len(deviceCapacities) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Delete(deviceID) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go index 42b54c08e1f..2be14d10ec6 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -18,6 +18,7 @@ package dynamicresources import ( "context" + "errors" "fmt" "sync" @@ -218,10 +219,29 @@ func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) { return result, nil } +// errClaimTrackerConcurrentModification gets returned if ListAllAllocatedDevices +// or GatherAllocatedState need to be retried. +// +// There is a rare race when a claim is initially in-flight: +// - allocated is created from cache (claim not there) +// - someone removes from the in-flight claims and adds to the cache +// - we start checking in-flight claims (claim not there anymore) +// => claim ignored +// +// A proper fix would be to rewrite the assume cache, allocatedDevices, +// and the in-flight map so that they are under a single lock. But that's +// a pretty big change and prevents reusing the assume cache. So instead +// we check for changes in the set of allocated devices and keep trying +// until we get an attempt with no concurrent changes. +// +// A claim being first in the cache, then only in-flight cannot happen, +// so we don't need to re-check the in-flight claims. +var errClaimTrackerConcurrentModification = errors.New("conflicting concurrent modification") + func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision := c.allocatedDevices.Get() // Whatever is in flight also has to be checked. c.inFlightAllocations.Range(func(key, value any) bool { @@ -232,16 +252,26 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], }, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {}) return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return allocated, nil + + if revision == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return allocated, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision1 := c.allocatedDevices.Get() allocatedSharedDeviceIDs := sets.New[structured.SharedDeviceID]() - aggregatedCapacity := c.allocatedDevices.Capacities() + aggregatedCapacity, revision2 := c.allocatedDevices.Capacities() + + if revision1 != revision2 { + // Already not consistent. Try again. + return nil, errClaimTrackerConcurrentModification + } enabledConsumableCapacity := utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity) @@ -263,12 +293,16 @@ func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return &structured.AllocatedState{ - AllocatedDevices: allocated, - AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs, - AggregatedCapacity: aggregatedCapacity, - }, nil + if revision1 == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return &structured.AllocatedState{ + AllocatedDevices: allocated, + AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs, + AggregatedCapacity: aggregatedCapacity, + }, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error { diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 8f89de765e8..e1e3466c266 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -518,25 +518,45 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. + // + // This might have to be retried in the unlikely case that some concurrent modification made + // the result invalid. var allocatedState *structured.AllocatedState - if pl.fts.EnableDRAConsumableCapacity { - allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState() - if err != nil { - return nil, statusError(logger, err) - } - if allocatedState == nil { - return nil, statusError(logger, errors.New("nil allocated state")) - } - } else { - allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() - if err != nil { - return nil, statusError(logger, err) - } - allocatedState = &structured.AllocatedState{ - AllocatedDevices: allocatedDevices, - AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](), - AggregatedCapacity: structured.NewConsumedCapacityCollection(), + err = wait.PollUntilContextTimeout(ctx, time.Microsecond, 5*time.Second, true /* immediate */, func(context.Context) (bool, error) { + if pl.fts.EnableDRAConsumableCapacity { + allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during GatherAllocatedState, trying again") + return false, nil + } + return false, err + } + if allocatedState == nil { + return false, errors.New("nil allocated state") + } + // Done. + return true, nil + } else { + allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during ListAllAllocatedDevices, trying again") + return false, nil + } + return false, err + } + allocatedState = &structured.AllocatedState{ + AllocatedDevices: allocatedDevices, + AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](), + AggregatedCapacity: structured.NewConsumedCapacityCollection(), + } + // Done. + return true, nil } + }) + if err != nil { + return nil, statusError(logger, fmt.Errorf("gather allocation state: %w", err)) } slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules() if err != nil { From 2198d96520d03862eb28b98b266f7ecd2e1f89d4 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 14 Jan 2026 19:48:43 +0100 Subject: [PATCH 3/4] DRA integration: add "uses all resources" test This corresponds to an E2E test which sometimes (but very rarely) flaked in the CI. --- test/integration/dra/dra_test.go | 3 + .../dra/uses_all_resources_test.go | 90 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 test/integration/dra/uses_all_resources_test.go diff --git a/test/integration/dra/dra_test.go b/test/integration/dra/dra_test.go index 347006d70df..fa8a4e8a61e 100644 --- a/test/integration/dra/dra_test.go +++ b/test/integration/dra/dra_test.go @@ -154,6 +154,7 @@ func TestDRA(t *testing.T) { // Number of devices per slice is chosen so that Filter takes a few seconds: // without a timeout, the test doesn't run too long, but long enough that a short timeout triggers. tCtx.Run("FilterTimeout", func(tCtx ktesting.TContext) { testFilterTimeout(tCtx, 9) }) + tCtx.Run("UsesAllResources", testUsesAllResources) }, }, "GA": { @@ -176,6 +177,7 @@ func TestDRA(t *testing.T) { tCtx = tCtx.WithNamespace(namespace) TestCreateResourceSlices(tCtx, 100) }) + tCtx.Run("UsesAllResources", testUsesAllResources) }, }, "v1beta1": { @@ -249,6 +251,7 @@ func TestDRA(t *testing.T) { // in the experimental channel has an improvement that requires a higher number here than // in the incubating and stable channels. tCtx.Run("FilterTimeout", func(tCtx ktesting.TContext) { testFilterTimeout(tCtx, 20) }) + tCtx.Run("UsesAllResources", testUsesAllResources) }, }, } { diff --git a/test/integration/dra/uses_all_resources_test.go b/test/integration/dra/uses_all_resources_test.go new file mode 100644 index 00000000000..1cd6164451d --- /dev/null +++ b/test/integration/dra/uses_all_resources_test.go @@ -0,0 +1,90 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "fmt" + "time" + + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/dynamic-resource-allocation/structured" + "k8s.io/klog/v2" + st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/test/utils/ktesting" +) + +func testUsesAllResources(tCtx ktesting.TContext) { + tCtx.Parallel() + namespace := createTestNamespace(tCtx, nil) + nodes, err := tCtx.Client().CoreV1().Nodes().List(tCtx, metav1.ListOptions{}) + tCtx.ExpectNoError(err, "list nodes") + numDevicesPerNode := 100 // One pod gets scheduled per device. + + class, driverName := createTestClass(tCtx, namespace) + for _, node := range nodes.Items { + // Globally unique device names make debugging simpler... + devices := make([]string, numDevicesPerNode) + for i := range numDevicesPerNode { + devices[i] = fmt.Sprintf("%s-device-%03d", node.Name, i) + } + slice := st.MakeResourceSlice(node.Name, driverName).Devices(devices...) + createSlice(tCtx, slice.Obj()) + } + + var claims []*resourceapi.ResourceClaim + var pods []*v1.Pod + for i := range len(nodes.Items) * numDevicesPerNode { + tCtx := tCtx.WithStep(fmt.Sprintf("#%04d", i)) + + claim := st.MakeResourceClaim(). + Name(fmt.Sprintf("claim-%04d", i)). + Namespace(namespace). + Request(class.Name). + Obj() + claim, err := tCtx.Client().ResourceV1().ResourceClaims(namespace).Create(tCtx, claim, metav1.CreateOptions{}) + tCtx.ExpectNoError(err, "create claim") + claims = append(claims, claim) + + pod := createPod(tCtx, namespace, fmt.Sprintf("-%04d", i), podWithClaimName, claim) + pods = append(pods, pod) + } + + startScheduler(tCtx) + + // Eventually, all pods should be scheduled and thus all claims allocated. + allocated := make(map[structured.DeviceID]*resourceapi.ResourceClaim, len(claims)) + tCtx = tCtx.WithStep("check claim allocation").WithTimeout(time.Duration(len(pods))*5*time.Second, "scheduling timeout for all pods") + for _, claim := range claims { + var actualClaim *resourceapi.ResourceClaim + tCtx.Eventually(func(tCtx ktesting.TContext) (*resourceapi.ResourceClaim, error) { + c, err := tCtx.Client().ResourceV1().ResourceClaims(claim.Namespace).Get(tCtx, claim.Name, metav1.GetOptions{}) + actualClaim = c + return c, err + }).Should(gomega.HaveField("Status.Allocation", gomega.Not(gomega.BeNil()))) + tCtx.Expect(actualClaim.Status.Allocation.Devices.Results).To(gomega.HaveLen(1)) + result := actualClaim.Status.Allocation.Devices.Results[0] + id := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + if otherClaim, ok := allocated[id]; ok { + tCtx.Fatalf("device %s was allocated to claims %s and %s", id, klog.KObj(actualClaim), klog.KObj(otherClaim)) + } + allocated[id] = actualClaim + } +} From 001ec49eb6725b11dc1a2f4820409eaad2b55f11 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 19 Jan 2026 15:09:36 +0100 Subject: [PATCH 4/4] DRA integration: more pods per node, more parallelism Long running tests like TestDRA/all/DeviceBindingConditions (42.50s) should run in parallel with other tests, otherwise the overall runtime is too high. This then must allow more pods per node to avoid blocking scheduling. --- test/integration/dra/binding_conditions_test.go | 2 ++ test/integration/dra/dra_test.go | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/test/integration/dra/binding_conditions_test.go b/test/integration/dra/binding_conditions_test.go index d37bf875231..ab5baef46e8 100644 --- a/test/integration/dra/binding_conditions_test.go +++ b/test/integration/dra/binding_conditions_test.go @@ -42,6 +42,8 @@ var ( // testDeviceBindingConditions is the entry point for running each integration test that verifies DeviceBindingConditions. // Some of these tests use device taints, and they assume that DRADeviceTaints is enabled. func testDeviceBindingConditions(tCtx ktesting.TContext, enabled bool) { + tCtx.Parallel() + tCtx.Run("BasicFlow", func(tCtx ktesting.TContext) { testDeviceBindingConditionsBasicFlow(tCtx, enabled) }) if enabled { tCtx.Run("FailureTaints", func(tCtx ktesting.TContext) { testDeviceBindingFailureConditionsReschedule(tCtx, true) }) diff --git a/test/integration/dra/dra_test.go b/test/integration/dra/dra_test.go index fa8a4e8a61e..3a4e7287084 100644 --- a/test/integration/dra/dra_test.go +++ b/test/integration/dra/dra_test.go @@ -118,8 +118,11 @@ var ( Namespace(namespace). RequestWithPrioritizedList(st.SubRequest("subreq-1", className, 1)). Obj() +) - numNodes = 8 +const ( + numNodes = 8 + maxPodsPerNode = 5000 // This should never be the limiting factor, no matter how many tests run in parallel. ) func TestDRA(t *testing.T) { @@ -233,6 +236,8 @@ func TestDRA(t *testing.T) { features.DRAExtendedResource: true, }, f: func(tCtx ktesting.TContext) { + // These tests must run in parallel as much as possible to keep overall runtime low! + tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, true) }) tCtx.Run("Convert", testConvert) tCtx.Run("ControllerManagerMetrics", testControllerManagerMetrics) @@ -312,7 +317,7 @@ func createNodes(tCtx ktesting.TContext) { Capacity: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("100"), v1.ResourceMemory: resource.MustParse("1000"), - v1.ResourcePods: resource.MustParse("100"), + v1.ResourcePods: *resource.NewScaledQuantity(maxPodsPerNode, 0), }, Phase: v1.NodeRunning, Conditions: []v1.NodeCondition{ @@ -489,6 +494,8 @@ func testConvert(tCtx ktesting.TContext) { // when the AdminAccess feature is enabled, it also checks that the field // is only allowed to be used in namespace with the Resource Admin Access label func testAdminAccess(tCtx ktesting.TContext, adminAccessEnabled bool) { + tCtx.Parallel() + namespace := createTestNamespace(tCtx, nil) claim1 := claim.DeepCopy() claim1.Namespace = namespace @@ -1189,6 +1196,8 @@ func testPublishResourceSlices(tCtx ktesting.TContext, haveLatestAPI bool, disab // // When enabled, it tries server-side-apply (SSA) with different clients. This is what DRA drivers should be using. func testResourceClaimDeviceStatus(tCtx ktesting.TContext, enabled bool) { + tCtx.Parallel() + namespace := createTestNamespace(tCtx, nil) claim := &resourceapi.ResourceClaim{ @@ -1383,7 +1392,8 @@ func testMaxResourceSlice(tCtx ktesting.TContext) { } } -// testControllerManagerMetrics tests ResourceClaim metrics +// testControllerManagerMetrics tests ResourceClaim metrics. +// It must run sequentially. func testControllerManagerMetrics(tCtx ktesting.TContext) { namespace := createTestNamespace(tCtx, nil) class, _ := createTestClass(tCtx, namespace)