diff --git a/pkg/scheduler/backend/cache/podgroupstate.go b/pkg/scheduler/backend/cache/podgroupstate.go index dc908b99863..3c6efb831a4 100644 --- a/pkg/scheduler/backend/cache/podgroupstate.go +++ b/pkg/scheduler/backend/cache/podgroupstate.go @@ -232,12 +232,56 @@ func (pgs *podGroupState) snapshot() *podGroupStateSnapshot { // empty returns true when the group contains no pods. // It must be called under the cache lock. func (pgs *podGroupState) empty() bool { + pgs.lock.RLock() + defer pgs.lock.RUnlock() + return pgs.podGroupStateData.empty() } -// forgetPod moves a pod back from the assumed state to unscheduled. +// addPod adds the pod to this group. +// Depending on the NodeName, it can insert the pod into either assignedPods or unscheduledPods. +// It must be called under the cache lock. +func (pgs *podGroupState) addPod(pod *v1.Pod) { + pgs.lock.Lock() + defer pgs.lock.Unlock() + + pgs.podGroupStateData.addPod(pod) +} + +// updatePod updates the pod in this group. +// In case of binding, it moves the pod to assignedPods. +// It must be called under the cache lock. +func (pgs *podGroupState) updatePod(oldPod, newPod *v1.Pod) { + pgs.lock.Lock() + defer pgs.lock.Unlock() + + pgs.podGroupStateData.updatePod(oldPod, newPod) +} + +// deletePod removes the pod from this pod group state. +// It must be called under the cache lock. +func (pgs *podGroupState) deletePod(podUID types.UID) { + pgs.lock.Lock() + defer pgs.lock.Unlock() + + pgs.podGroupStateData.deletePod(podUID) +} + +// assumePod marks a pod as assumed within the pod group state. +// It must be called under the cache lock. +func (pgs *podGroupState) assumePod(pod *v1.Pod) { + pgs.lock.Lock() + defer pgs.lock.Unlock() + + pgs.podGroupStateData.assumePod(pod) +} + +// forgetPod moves a pod back from the assumed state to unscheduled within the pod group state. // It must be called under the cache lock. func (pgs *podGroupState) forgetPod(podUID types.UID) { + pgs.lock.Lock() + defer pgs.lock.Unlock() + pgs.podGroupStateData.forgetPod(podUID) } diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 6d250d818e9..d5a4cfbfa57 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -2655,7 +2655,7 @@ var _ fwk.PostBindPlugin = &PostBindPlugin{} type JobPlugin struct { podLister listersv1.PodLister - podsActivated bool + podsActivated chan struct{} } func (j *JobPlugin) Name() string { @@ -2696,7 +2696,7 @@ func (j *JobPlugin) PostBind(_ context.Context, state fwk.CycleState, p *v1.Pod, s.Map[namespacedName] = pod } s.Unlock() - j.podsActivated = true + j.podsActivated <- struct{}{} } } } @@ -2710,7 +2710,10 @@ func TestActivatePods(t *testing.T) { var jobPlugin *JobPlugin // Create a plugin registry for testing. Register a Job plugin. registry := frameworkruntime.Registry{jobPluginName: func(_ context.Context, _ runtime.Object, fh fwk.Handle) (fwk.Plugin, error) { - jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()} + jobPlugin = &JobPlugin{ + podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(), + podsActivated: make(chan struct{}, 10), + } return jobPlugin, nil }} @@ -2775,8 +2778,10 @@ func TestActivatePods(t *testing.T) { } // Lastly verify the pods activation logic is really called. - if jobPlugin.podsActivated == false { - t.Errorf("JobPlugin's pods activation logic is not called") + select { + case <-jobPlugin.podsActivated: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("JobPlugin's pods activation logic wasn't called") } }