From 2a026f6d6597b863d4d10e2c3c853b41f057d602 Mon Sep 17 00:00:00 2001 From: yliao Date: Wed, 30 Jul 2025 17:59:03 +0000 Subject: [PATCH] 1/ added retries to AssumeClaimAfterAPICall for the object which is not present in the cache (dynamicresources.go) 2/ modified the assume cache verification to not error out as long as the expected claim is in the cache, no matter its latest and api object are different or not. (dynamicresources_test.go). 3/ fixed nil panic as seen from https://prow.k8s.io/view/gs/kubernetes-ci-logs/pr-logs/pull/133321/pull-kubernetes-integration/1952472629470302208 --- .../dynamicresources/dynamicresources.go | 30 +++++++++- .../dynamicresources/dynamicresources_test.go | 55 +++++++++++++++---- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 3cd457a9ea9..df33e286ae0 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/util/slice" "k8s.io/utils/ptr" ) @@ -76,6 +77,10 @@ const ( // BindingTimeoutDefaultSeconds is the default timeout for waiting for // BindingConditions to be ready. BindingTimeoutDefaultSeconds = 600 + + // AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting + // for the extended resource claim to be updated in assumed cache. + AssumeExtendedResourceTimeoutDefaultSeconds = 120 ) // The state is initialized in PreFilter phase. Because we save the pointer in @@ -1446,8 +1451,28 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind if finalErr == nil { // This can fail, but only for reasons that are okay (concurrent delete or update). // Shouldn't happen in this case. + if isExtendedResourceClaim { + // Unlike other claims, extended resource claim is created in API server below. + // AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet. + // Hence we must poll and wait for it. + pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true, + func(ctx context.Context) (bool, error) { + if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { + if errors.Is(err, assumecache.ErrNotFound) { + return false, nil + } + logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err) + return false, err + } + return true, nil + }) + if pollErr != nil { + logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr) + } + } + } else { if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { - logger.V(5).Info("Claim not stored in assume cache", "err", finalErr) + logger.V(5).Info("Claim not stored in assume cache", "err", err) } } for _, claimUID := range claimUIDs { @@ -1576,6 +1601,9 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind // and no binding failure conditions are true, // which includes the case that there are no binding conditions. func (pl *DynamicResources) isClaimReadyForBinding(claim *resourceapi.ResourceClaim) (bool, error) { + if claim.Status.Allocation == nil { + return false, nil + } for _, deviceRequest := range claim.Status.Allocation.Devices.Results { if len(deviceRequest.BindingConditions) == 0 { continue diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index a86bbcdcef6..6dbf98e40ce 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -37,6 +37,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" cgotesting "k8s.io/client-go/testing" @@ -1317,7 +1318,7 @@ func TestPlugin(t *testing.T) { }, postfilter: result{ status: fwk.NewStatus(fwk.Unschedulable, `deletion of ResourceClaim completed`), - removed: []metav1.Object{extendedResourceClaimNode2}, + removed: []metav1.Object{extendedResourceClaim}, }, }, }, @@ -1326,6 +1327,7 @@ func TestPlugin(t *testing.T) { pod: podWithExtendedResourceName, claims: []*resourceapi.ResourceClaim{extendedResourceClaimNode2}, classes: []*resourceapi.DeviceClass{deviceClassWithExtendResourceName}, + objs: []apiruntime.Object{workerNodeSlice, podWithExtendedResourceName}, want: want{ filter: perNodeResult{ workerNode.Name: { @@ -1827,10 +1829,6 @@ func TestPlugin(t *testing.T) { initialObjects = testCtx.listAll(t) testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) t.Run("unreserverAfterBindFailure", func(t *testing.T) { - // in case we delete the claim API object - // wait for assumed cache to sync with informer - // then assumed cache should be empty - time.Sleep(800 * time.Millisecond) testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status) }) } else if status.IsSuccess() { @@ -1905,9 +1903,42 @@ func (tc *testContext) verify(t *testing.T, expected result, initialObjects []me if expected.assumedClaim != nil { expectAssumedClaims = append(expectAssumedClaims, expected.assumedClaim) } - actualAssumedClaims := tc.listAssumedClaims() - if diff := cmp.Diff(expectAssumedClaims, actualAssumedClaims, ignoreFieldsInResourceClaims...); diff != "" { - t.Errorf("Assumed claims are different (- expected, + actual):\n%s", diff) + // actualAssumedClaims are claims in assumed cache with different latest and api object + // sameAssumedClaims are claims in assumed cache with same latest and api object + actualAssumedClaims, sameAssumedClaims := tc.listAssumedClaims() + + // error when expecting no claims in assumed cache with different latest and api object + if len(expectAssumedClaims) == 0 && len(actualAssumedClaims) != 0 { + // In case we delete the claim API object, wait for assumed cache to sync with informer, + // then assumed cache should be empty. + err := wait.PollUntilContextTimeout(tc.ctx, 200*time.Millisecond, time.Minute, true, + func(ctx context.Context) (bool, error) { + actualAssumedClaims, sameAssumedClaims = tc.listAssumedClaims() + return len(actualAssumedClaims) == 0, nil + }) + if err != nil || len(actualAssumedClaims) != 0 { + t.Errorf("Assumed claims are different, err=%v, expected: nil, actual:\n%v", err, actualAssumedClaims) + } + } + if len(expectAssumedClaims) > 0 { + // it is not an error as long as the expected claim is present in the assumed cache, no + // matter its latest and api object are different or not. + for _, expected := range expectAssumedClaims { + seen := false + for _, actual := range actualAssumedClaims { + if cmp.Equal(expected, actual, ignoreFieldsInResourceClaims...) { + seen = true + } + } + for _, same := range sameAssumedClaims { + if cmp.Equal(expected, same, ignoreFieldsInResourceClaims...) { + seen = true + } + } + if !seen { + t.Errorf("Assumed claims are different, expected: %v not found", expected) + } + } } var expectInFlightClaims []metav1.Object @@ -1932,18 +1963,22 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) { return } -func (tc *testContext) listAssumedClaims() []metav1.Object { +func (tc *testContext) listAssumedClaims() ([]metav1.Object, []metav1.Object) { var assumedClaims []metav1.Object + var sameClaims []metav1.Object for _, obj := range tc.draManager.resourceClaimTracker.cache.List(nil) { claim := obj.(*resourceapi.ResourceClaim) obj, _ := tc.draManager.resourceClaimTracker.cache.Get(claim.Namespace + "/" + claim.Name) apiObj, _ := tc.draManager.resourceClaimTracker.cache.GetAPIObj(claim.Namespace + "/" + claim.Name) if obj != apiObj { assumedClaims = append(assumedClaims, claim) + } else { + sameClaims = append(sameClaims, claim) } } sortObjects(assumedClaims) - return assumedClaims + sortObjects(sameClaims) + return assumedClaims, sameClaims } func (tc *testContext) listInFlightClaims() []metav1.Object {