mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Merge pull request #137641 from helayoty/helayoty/protection-controller-podgroup
KEP-5832: Add protection controller for PodGroup
This commit is contained in:
commit
1b5bcf309c
21 changed files with 1372 additions and 5 deletions
|
|
@ -194,6 +194,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor {
|
|||
register(newGarbageCollectorControllerDescriptor())
|
||||
register(newDaemonSetControllerDescriptor())
|
||||
register(newJobControllerDescriptor())
|
||||
register(newPodGroupProtectionControllerDescriptor())
|
||||
register(newDeploymentControllerDescriptor())
|
||||
register(newReplicaSetControllerDescriptor())
|
||||
register(newHorizontalPodAutoscalerControllerDescriptor())
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ func TestControllerNamesDeclaration(t *testing.T) {
|
|||
names.GarbageCollectorController,
|
||||
names.DaemonSetController,
|
||||
names.JobController,
|
||||
names.PodGroupProtectionController,
|
||||
names.DeploymentController,
|
||||
names.ReplicaSetController,
|
||||
names.HorizontalPodAutoscalerController,
|
||||
|
|
|
|||
64
cmd/kube-controller-manager/app/scheduling.go
Normal file
64
cmd/kube-controller-manager/app/scheduling.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/scheduling/podgroupprotection"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func newPodGroupProtectionControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PodGroupProtectionController,
|
||||
constructor: newPodGroupProtectionController,
|
||||
requiredFeatureGates: []featuregate.Feature{
|
||||
features.GenericWorkload,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newPodGroupProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
|
||||
return nil, nil
|
||||
}
|
||||
client, err := controllerContext.NewClient(controllerName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pgProtectionController, err := podgroupprotection.NewPodGroupProtectionController(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.InformerFactory.Scheduling().V1alpha2().PodGroups(),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
client,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to init %s: %w", controllerName, err)
|
||||
}
|
||||
|
||||
return newControllerLoop(func(ctx context.Context) {
|
||||
pgProtectionController.Run(ctx, 1)
|
||||
}, controllerName), nil
|
||||
}
|
||||
100
cmd/kube-controller-manager/app/scheduling_test.go
Normal file
100
cmd/kube-controller-manager/app/scheduling_test.go
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func TestPodGroupProtectionController(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
enableFeatureGate bool
|
||||
expectController bool
|
||||
}{
|
||||
{
|
||||
name: "controller runs when GenericWorkload is enabled",
|
||||
enableFeatureGate: true,
|
||||
expectController: true,
|
||||
},
|
||||
{
|
||||
name: "controller is disabled when GenericWorkload is disabled",
|
||||
enableFeatureGate: false,
|
||||
expectController: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, test.enableFeatureGate)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
clientset := fakeclientset.NewClientset()
|
||||
controllerCtx := ControllerContext{
|
||||
ClientBuilder: TestClientBuilder{clientset: clientset},
|
||||
InformerFactory: informers.NewSharedInformerFactoryWithOptions(clientset, time.Duration(1)),
|
||||
}
|
||||
controllerCtx.ComponentConfig.Generic.Controllers = []string{names.PodGroupProtectionController}
|
||||
|
||||
// Verify the constructor produces the expected result.
|
||||
ctrl, err := newPodGroupProtectionController(ctx, controllerCtx, names.PodGroupProtectionController)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if test.expectController && ctrl == nil {
|
||||
t.Errorf("expected a controller, got nil")
|
||||
} else if !test.expectController && ctrl != nil {
|
||||
t.Errorf("expected no controller, got %v", ctrl)
|
||||
}
|
||||
|
||||
// Verify the descriptor gates the controller correctly.
|
||||
initFuncCalled := false
|
||||
descriptor := NewControllerDescriptors()[names.PodGroupProtectionController]
|
||||
descriptor.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
|
||||
initFuncCalled = true
|
||||
return newControllerLoop(func(ctx context.Context) {}, controllerName), nil
|
||||
}
|
||||
|
||||
var healthChecks mockHealthCheckAdder
|
||||
if err := runControllers(ctx, controllerCtx, map[string]*ControllerDescriptor{
|
||||
names.PodGroupProtectionController: descriptor,
|
||||
}, &healthChecks); err != nil {
|
||||
t.Fatalf("unexpected error starting controller: %v", err)
|
||||
}
|
||||
if test.expectController != initFuncCalled {
|
||||
t.Errorf("constructor call: expected=%v, got=%v", test.expectController, initFuncCalled)
|
||||
}
|
||||
hasHealthCheck := len(healthChecks.Checks) > 0
|
||||
if test.expectController != hasHealthCheck {
|
||||
t.Errorf("health check: expected=%v, got=%v", test.expectController, hasHealthCheck)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -76,6 +76,7 @@ const (
|
|||
ClusterRoleAggregationController = "clusterrole-aggregation-controller"
|
||||
PersistentVolumeClaimProtectionController = "persistentvolumeclaim-protection-controller"
|
||||
PersistentVolumeProtectionController = "persistentvolume-protection-controller"
|
||||
PodGroupProtectionController = "podgroup-protection-controller"
|
||||
TTLAfterFinishedController = "ttl-after-finished-controller"
|
||||
RootCACertificatePublisherController = "root-ca-certificate-publisher-controller"
|
||||
KubeAPIServerClusterTrustBundlePublisherController = "kube-apiserver-serving-clustertrustbundle-publisher-controller"
|
||||
|
|
|
|||
|
|
@ -39,6 +39,10 @@ const (
|
|||
SystemClusterCritical = SystemPriorityClassPrefix + "cluster-critical"
|
||||
// SystemNodeCritical is the system priority class name that represents node-critical.
|
||||
SystemNodeCritical = SystemPriorityClassPrefix + "node-critical"
|
||||
|
||||
// PodGroupProtectionFinalizer is the finalizer added to PodGroups to prevent
|
||||
// premature deletion while pods still reference them.
|
||||
PodGroupProtectionFinalizer = GroupName + "/podgroup-protection"
|
||||
)
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
|
|
|||
|
|
@ -0,0 +1,339 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podgroupprotection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
schedulingv1alpha2 "k8s.io/api/scheduling/v1alpha2"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
schedulinginformers "k8s.io/client-go/informers/scheduling/v1alpha2"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha2"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/apis/scheduling"
|
||||
"k8s.io/kubernetes/pkg/controller/util/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
)
|
||||
|
||||
const (
|
||||
// The index name for looking up active pods by their
|
||||
// schedulingGroup.podGroupName field.
|
||||
activePodSchedulingGroupIndex = "activePodSchedulingGroup"
|
||||
)
|
||||
|
||||
// Controller manages the PodGroupProtectionFinalizer on PodGroup objects.
|
||||
// The finalizer is stamped at creation time by the PodGroupProtection admission
|
||||
// plugin; this controller removes it when the PodGroup is being deleted and no
|
||||
// active (non-terminated) pods still reference it.
|
||||
type Controller struct {
|
||||
kubeClient clientset.Interface
|
||||
|
||||
podGroupLister schedulinglisters.PodGroupLister
|
||||
podGroupSynced cache.InformerSynced
|
||||
|
||||
podSynced cache.InformerSynced
|
||||
|
||||
// podIndexer has the common Pod indexer installed to
|
||||
// limit iteration over pods to those of interest.
|
||||
podIndexer cache.Indexer
|
||||
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
}
|
||||
|
||||
// NewPodGroupProtectionController returns a new instance of the PodGroup protection controller.
|
||||
func NewPodGroupProtectionController(
|
||||
logger klog.Logger,
|
||||
podGroupInformer schedulinginformers.PodGroupInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
kubeClient clientset.Interface,
|
||||
) (*Controller, error) {
|
||||
c := &Controller{
|
||||
kubeClient: kubeClient,
|
||||
podGroupLister: podGroupInformer.Lister(),
|
||||
podGroupSynced: podGroupInformer.Informer().HasSynced,
|
||||
podIndexer: podInformer.Informer().GetIndexer(),
|
||||
podSynced: podInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "podgroupprotection"},
|
||||
),
|
||||
}
|
||||
|
||||
if _, err := podGroupInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
c.handlePodGroupUpdate(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
c.handlePodGroupUpdate(logger, new)
|
||||
},
|
||||
}, cache.HandlerOptions{Logger: &logger}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := addActivePodSchedulingGroupIndexer(c.podIndexer); err != nil {
|
||||
return nil, fmt.Errorf("could not initialize PodGroup protection controller: %w", err)
|
||||
}
|
||||
|
||||
if _, err := podInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
c.handlePodChange(logger, nil, obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
c.handlePodChange(logger, obj, nil)
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
c.handlePodChange(logger, old, new)
|
||||
},
|
||||
}, cache.HandlerOptions{Logger: &logger}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// addActivePodSchedulingGroupIndexer adds an indexer to look up active
|
||||
// pods by their schedulingGroup.podGroupName field so we can efficiently
|
||||
// determine whether a PodGroup still has active pods.
|
||||
func addActivePodSchedulingGroupIndexer(indexer cache.Indexer) error {
|
||||
return indexer.AddIndexers(cache.Indexers{
|
||||
activePodSchedulingGroupIndex: func(obj interface{}) ([]string, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
if isPodTerminated(pod) {
|
||||
return nil, nil
|
||||
}
|
||||
if pod.Spec.SchedulingGroup == nil || pod.Spec.SchedulingGroup.PodGroupName == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return []string{pod.Namespace + "/" + *pod.Spec.SchedulingGroup.PodGroupName}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs the controller goroutines.
|
||||
func (c *Controller) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Starting PodGroup protection controller")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
logger.Info("Shutting down PodGroup protection controller")
|
||||
c.queue.ShutDown()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
if !cache.WaitForNamedCacheSyncWithContext(ctx, c.podGroupSynced, c.podSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for range workers {
|
||||
wg.Go(func() {
|
||||
wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
||||
})
|
||||
}
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (c *Controller) runWorker(ctx context.Context) {
|
||||
for c.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
||||
pgKey, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(pgKey)
|
||||
|
||||
err := c.processPodGroup(ctx, pgKey)
|
||||
if err == nil {
|
||||
c.queue.Forget(pgKey)
|
||||
return true
|
||||
}
|
||||
|
||||
c.queue.AddRateLimited(pgKey)
|
||||
utilruntime.HandleError(fmt.Errorf("PodGroup %v failed with: %w", pgKey, err))
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Controller) processPodGroup(ctx context.Context, pgKey string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("Processing PodGroup", "podGroup", pgKey)
|
||||
|
||||
pgNamespace, pgName, err := cache.SplitMetaNamespaceKey(pgKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing PodGroup key %q: %w", pgKey, err)
|
||||
}
|
||||
|
||||
pg, err := c.podGroupLister.PodGroups(pgNamespace).Get(pgName)
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.V(4).Info("PodGroup not found, ignoring", "podGroup", pgKey)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !protectionutil.IsDeletionCandidate(pg, scheduling.PodGroupProtectionFinalizer) {
|
||||
return nil
|
||||
}
|
||||
|
||||
isUsed, err := c.hasActivePods(ctx, pg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !isUsed {
|
||||
return c.removeFinalizer(ctx, pg)
|
||||
}
|
||||
logger.V(4).Info("Keeping PodGroup finalizer because it is still being used by pods", "podGroup", klog.KObj(pg))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) removeFinalizer(ctx context.Context, pg *schedulingv1alpha2.PodGroup) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
pgClone := pg.DeepCopy()
|
||||
|
||||
pgClone.Finalizers = slice.RemoveString(pgClone.Finalizers, scheduling.PodGroupProtectionFinalizer, nil)
|
||||
_, err := c.kubeClient.SchedulingV1alpha2().PodGroups(pgClone.Namespace).Update(ctx, pgClone, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
logger.Error(err, "Error removing protection finalizer from PodGroup", "podGroup", klog.KObj(pg))
|
||||
return err
|
||||
}
|
||||
|
||||
logger.V(3).Info("Removed protection finalizer from PodGroup", "podGroup", klog.KObj(pg))
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasActivePods returns true if any active pods reference the PodGroup
|
||||
// via spec.schedulingGroup.podGroupName. The index only contains
|
||||
// non-terminated pods, so a non-empty result means the PodGroup is still in use.
|
||||
func (c *Controller) hasActivePods(ctx context.Context, pg *schedulingv1alpha2.PodGroup) (bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
indexKey := pg.Namespace + "/" + pg.Name
|
||||
|
||||
objs, err := c.podIndexer.ByIndex(activePodSchedulingGroupIndex, indexKey)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("index-based list of active pods failed for PodGroup %s: %w", indexKey, err)
|
||||
}
|
||||
|
||||
if len(objs) > 0 {
|
||||
logger.V(4).Info("Pod is using PodGroup", "pod", klog.KObj(objs[0].(*v1.Pod)), "podGroup", klog.KObj(pg))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
logger.V(4).Info("No active pods found using PodGroup", "podGroup", klog.KObj(pg))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// isPodTerminated returns true if the pod has completed (Succeeded or Failed).
|
||||
func isPodTerminated(pod *v1.Pod) bool {
|
||||
return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
|
||||
}
|
||||
|
||||
// handlePodGroupUpdate handles PodGroup add/update events.
|
||||
// Only deletion candidates which are being deleted and have the finalizer need processing.
|
||||
func (c *Controller) handlePodGroupUpdate(logger klog.Logger, obj interface{}) {
|
||||
pg, ok := obj.(*schedulingv1alpha2.PodGroup)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("PodGroup informer returned non-PodGroup object: %#v", obj))
|
||||
return
|
||||
}
|
||||
if !protectionutil.IsDeletionCandidate(pg, scheduling.PodGroupProtectionFinalizer) {
|
||||
return
|
||||
}
|
||||
key, err := cache.MetaNamespaceKeyFunc(pg)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get key for PodGroup %#v: %w", pg, err))
|
||||
return
|
||||
}
|
||||
logger.V(4).Info("Got event on PodGroup", "podGroup", klog.KObj(pg))
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
// handlePodChange handles Pod add/delete/update events.
|
||||
// It enqueues the referenced PodGroup only when the event could affect
|
||||
// finalizer decisions where the pod is deleted or transitioned to a terminal phase.
|
||||
func (c *Controller) handlePodChange(logger klog.Logger, old, new interface{}) {
|
||||
newPod := getPod(new)
|
||||
oldPod := getPod(old)
|
||||
|
||||
if newPod != nil && isPodTerminated(newPod) {
|
||||
c.enqueuePodGroupForPod(logger, newPod)
|
||||
}
|
||||
|
||||
// An update notification might mask the deletion of a pod X and the
|
||||
// following creation of a pod Y with the same namespaced name as X. If
|
||||
// that's the case, X needs to be processed as well to handle the case
|
||||
// where it was the last active pod keeping the finalizer on a PodGroup.
|
||||
if newPod != nil && oldPod != nil && oldPod.UID != newPod.UID {
|
||||
c.enqueuePodGroupForPod(logger, oldPod)
|
||||
}
|
||||
|
||||
if newPod == nil && oldPod != nil {
|
||||
c.enqueuePodGroupForPod(logger, oldPod)
|
||||
}
|
||||
}
|
||||
|
||||
// enqueuePodGroupForPod enqueues the PodGroup referenced by the pod.
|
||||
// Callers are responsible for only passing pods whose state change could allow
|
||||
// finalizer removal (deleted or transitioned to a terminal phase).
|
||||
func (c *Controller) enqueuePodGroupForPod(logger klog.Logger, pod *v1.Pod) {
|
||||
if pod.Spec.SchedulingGroup == nil || pod.Spec.SchedulingGroup.PodGroupName == nil {
|
||||
return
|
||||
}
|
||||
|
||||
pgKey := pod.Namespace + "/" + *pod.Spec.SchedulingGroup.PodGroupName
|
||||
logger.V(4).Info("Enqueuing PodGroup for pod event", "pod", klog.KObj(pod), "podGroup", pgKey)
|
||||
c.queue.Add(pgKey)
|
||||
}
|
||||
|
||||
func getPod(obj interface{}) *v1.Pod {
|
||||
if obj == nil {
|
||||
return nil
|
||||
}
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
|
||||
return nil
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod %#v", obj))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
|
@ -0,0 +1,551 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podgroupprotection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
schedulingv1alpha2 "k8s.io/api/scheduling/v1alpha2"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/apis/scheduling"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultNS = "default"
|
||||
defaultPGName = "my-podgroup"
|
||||
defaultPGUID = "pg-uid-1"
|
||||
)
|
||||
|
||||
func podGroup() *schedulingv1alpha2.PodGroup {
|
||||
return &schedulingv1alpha2.PodGroup{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: defaultPGName,
|
||||
Namespace: defaultNS,
|
||||
UID: defaultPGUID,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func withFinalizer(pg *schedulingv1alpha2.PodGroup) *schedulingv1alpha2.PodGroup {
|
||||
pg.Finalizers = append(pg.Finalizers, scheduling.PodGroupProtectionFinalizer)
|
||||
return pg
|
||||
}
|
||||
|
||||
func deletedPodGroup(pg *schedulingv1alpha2.PodGroup) *schedulingv1alpha2.PodGroup {
|
||||
pg.DeletionTimestamp = &metav1.Time{}
|
||||
return pg
|
||||
}
|
||||
|
||||
func podForPG(name string, pgName string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: defaultNS,
|
||||
UID: types.UID(name + "-uid"),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
SchedulingGroup: &v1.PodSchedulingGroup{
|
||||
PodGroupName: ptr.To(pgName),
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func terminatedPod(pod *v1.Pod, phase v1.PodPhase) *v1.Pod {
|
||||
pod.Status.Phase = phase
|
||||
return pod
|
||||
}
|
||||
|
||||
func unscheduledPod(pod *v1.Pod) *v1.Pod {
|
||||
pod.Spec.NodeName = ""
|
||||
return pod
|
||||
}
|
||||
|
||||
func podWithoutSchedulingGroup(name string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: defaultNS,
|
||||
UID: types.UID(name + "-uid"),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsPodTerminated(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
phase v1.PodPhase
|
||||
want bool
|
||||
}{
|
||||
"running": {phase: v1.PodRunning, want: false},
|
||||
"pending": {phase: v1.PodPending, want: false},
|
||||
"succeeded": {phase: v1.PodSucceeded, want: true},
|
||||
"failed": {phase: v1.PodFailed, want: true},
|
||||
}
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
pod := &v1.Pod{Status: v1.PodStatus{Phase: tc.phase}}
|
||||
if got := isPodTerminated(pod); got != tc.want {
|
||||
t.Errorf("isPodTerminated(%v) = %v, want %v", tc.phase, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPod(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
obj interface{}
|
||||
want bool
|
||||
}{
|
||||
"nil": {
|
||||
obj: nil,
|
||||
want: false,
|
||||
},
|
||||
"pod": {
|
||||
obj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}},
|
||||
want: true,
|
||||
},
|
||||
"tombstone with pod": {
|
||||
obj: cache.DeletedFinalStateUnknown{
|
||||
Key: "default/p",
|
||||
Obj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
"tombstone with non-pod": {
|
||||
obj: cache.DeletedFinalStateUnknown{
|
||||
Key: "default/p",
|
||||
Obj: &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "p"}},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
"non-pod object": {
|
||||
obj: &v1.ConfigMap{},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
got := getPod(tc.obj)
|
||||
if (got != nil) != tc.want {
|
||||
t.Errorf("parsePod() returned pod=%v, want non-nil=%v", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePodChange(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
tests := map[string]struct {
|
||||
old interface{}
|
||||
new interface{}
|
||||
wantSize int
|
||||
}{
|
||||
"deleted pod with schedulingGroup enqueues": {
|
||||
old: podForPG("pod-1", defaultPGName),
|
||||
new: nil,
|
||||
wantSize: 1,
|
||||
},
|
||||
"terminated pod enqueues": {
|
||||
old: podForPG("pod-1", defaultPGName),
|
||||
new: terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded),
|
||||
wantSize: 1,
|
||||
},
|
||||
"unscheduled new pod does not enqueue": {
|
||||
old: nil,
|
||||
new: unscheduledPod(podForPG("pod-1", defaultPGName)),
|
||||
wantSize: 0,
|
||||
},
|
||||
"scheduled running pod does not enqueue": {
|
||||
old: nil,
|
||||
new: podForPG("pod-1", defaultPGName),
|
||||
wantSize: 0,
|
||||
},
|
||||
"deleted pod without schedulingGroup does not enqueue": {
|
||||
old: podWithoutSchedulingGroup("pod-1"),
|
||||
new: nil,
|
||||
wantSize: 0,
|
||||
},
|
||||
"deleted pod with nil podGroupName does not enqueue": {
|
||||
old: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: defaultNS},
|
||||
Spec: v1.PodSpec{
|
||||
SchedulingGroup: &v1.PodSchedulingGroup{PodGroupName: nil},
|
||||
},
|
||||
},
|
||||
new: nil,
|
||||
wantSize: 0,
|
||||
},
|
||||
"UID mismatch with terminated new pod, same PodGroup, deduplicates enqueue": {
|
||||
old: func() *v1.Pod {
|
||||
p := podForPG("pod-1", defaultPGName)
|
||||
p.UID = "old-uid"
|
||||
return p
|
||||
}(),
|
||||
new: func() interface{} {
|
||||
p := terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)
|
||||
p.UID = "new-uid"
|
||||
return p
|
||||
}(),
|
||||
wantSize: 1,
|
||||
},
|
||||
"UID mismatch with terminated new pod referencing different PodGroup enqueues both": {
|
||||
old: func() *v1.Pod {
|
||||
p := podForPG("pod-1", defaultPGName)
|
||||
p.UID = "old-uid"
|
||||
return p
|
||||
}(),
|
||||
new: func() interface{} {
|
||||
p := terminatedPod(podForPG("pod-1", "other-pg"), v1.PodFailed)
|
||||
p.UID = "new-uid"
|
||||
return p
|
||||
}(),
|
||||
wantSize: 2,
|
||||
},
|
||||
"UID mismatch on update with non-terminated new pod enqueues only old pod": {
|
||||
old: func() *v1.Pod {
|
||||
p := podForPG("pod-1", defaultPGName)
|
||||
p.UID = "old-uid"
|
||||
return p
|
||||
}(),
|
||||
new: func() interface{} {
|
||||
p := unscheduledPod(podForPG("pod-1", defaultPGName))
|
||||
p.UID = "new-uid"
|
||||
return p
|
||||
}(),
|
||||
wantSize: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
c := &Controller{
|
||||
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
|
||||
}
|
||||
defer c.queue.ShutDown()
|
||||
c.handlePodChange(logger, tc.old, tc.new)
|
||||
|
||||
if c.queue.Len() != tc.wantSize {
|
||||
t.Errorf("queue size = %d, want %d", c.queue.Len(), tc.wantSize)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodGroupProtectionController(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
// Objects to seed into the fake client before the controller starts.
|
||||
initialObjects []runtime.Object
|
||||
// Pod to delete (by name in defaultNS) after the controller starts.
|
||||
podToDelete string
|
||||
// Whether the finalizer should be present on the PodGroup after the
|
||||
// controller has finished processing.
|
||||
expectFinalizer bool
|
||||
}{
|
||||
{
|
||||
name: "new PodGroup without finalizer, no action (admission plugin handles it)",
|
||||
initialObjects: []runtime.Object{podGroup()},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "new PodGroup without finalizer, active pod exists, then no action (admission plugin handles it)",
|
||||
initialObjects: []runtime.Object{podGroup(), podForPG("pod-1", defaultPGName)},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "PodGroup with finalizer, not being deleted, then no action",
|
||||
initialObjects: []runtime.Object{withFinalizer(podGroup())},
|
||||
expectFinalizer: true,
|
||||
},
|
||||
{
|
||||
name: "deleted PodGroup with finalizer, no active pods, then finalizer is removed",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup()))},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "deleted PodGroup with finalizer, active pod exists, then finalizer is kept",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), podForPG("pod-1", defaultPGName)},
|
||||
expectFinalizer: true,
|
||||
},
|
||||
{
|
||||
name: "deleted PodGroup with finalizer, only terminated pods, then finalizer is removed",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "deleted PodGroup with finalizer, mix of active and terminated pods, then finalizer is kept",
|
||||
initialObjects: []runtime.Object{
|
||||
deletedPodGroup(withFinalizer(podGroup())),
|
||||
podForPG("pod-active", defaultPGName),
|
||||
terminatedPod(podForPG("pod-done", defaultPGName), v1.PodSucceeded),
|
||||
},
|
||||
expectFinalizer: true,
|
||||
},
|
||||
{
|
||||
name: "PodGroup without finalizer, already deleted, then no action (not a deletion candidate)",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(podGroup())},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "PodGroup without finalizer, already deleted, active pods exist, then no action (should not add finalizer to deleting object)",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(podGroup()), podForPG("pod-1", defaultPGName)},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "pod deleted, PodGroup being deleted with finalizer, was last active pod, finalizer is removed",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), podForPG("pod-1", defaultPGName)},
|
||||
podToDelete: "pod-1",
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "pod terminated succeeded, PodGroup being deleted with finalizer, was last active pod, then finalizer is removed",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "pod terminated failed, PodGroup being deleted with finalizer, was last active pod, then finalizer is removed",
|
||||
initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), terminatedPod(podForPG("pod-1", defaultPGName), v1.PodFailed)},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "new unscheduled pod, PodGroup without finalizer, then no action (admission plugin handles it)",
|
||||
initialObjects: []runtime.Object{podGroup(), unscheduledPod(podForPG("pod-1", defaultPGName))},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
{
|
||||
name: "pod without schedulingGroup -> no PodGroup action",
|
||||
initialObjects: []runtime.Object{withFinalizer(podGroup()), podWithoutSchedulingGroup("pod-1")},
|
||||
expectFinalizer: true,
|
||||
},
|
||||
{
|
||||
name: "terminated pod references non-existent PodGroup, controller handles gracefully",
|
||||
initialObjects: []runtime.Object{terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)},
|
||||
expectFinalizer: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
client := fake.NewClientset(test.initialObjects...)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
pgInformer := informerFactory.Scheduling().V1alpha2().PodGroups()
|
||||
podInformer := informerFactory.Core().V1().Pods()
|
||||
|
||||
ctrl, err := NewPodGroupProtectionController(klog.FromContext(ctx), pgInformer, podInformer, client)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating controller: %v", err)
|
||||
}
|
||||
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
go ctrl.Run(ctx, 1)
|
||||
|
||||
if test.podToDelete != "" {
|
||||
if err := client.CoreV1().Pods(defaultNS).Delete(ctx, test.podToDelete, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatalf("deleting pod: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
pg, err := client.SchedulingV1alpha2().PodGroups(defaultNS).Get(ctx, defaultPGName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
return !test.expectFinalizer, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
hasFinalizer := slices.Contains(pg.Finalizers, scheduling.PodGroupProtectionFinalizer)
|
||||
return hasFinalizer == test.expectFinalizer, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("timed out waiting for expected finalizer state (want present=%v): %v", test.expectFinalizer, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivePodSchedulingGroupIndexer(t *testing.T) {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||
if err := addActivePodSchedulingGroupIndexer(indexer); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
pod1 := podForPG("pod-1", "pg-a")
|
||||
_ = indexer.Add(pod1)
|
||||
|
||||
pod2 := podWithoutSchedulingGroup("pod-2")
|
||||
_ = indexer.Add(pod2)
|
||||
|
||||
pod3 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-3", Namespace: defaultNS, UID: "pod-3-uid"},
|
||||
Spec: v1.PodSpec{
|
||||
SchedulingGroup: &v1.PodSchedulingGroup{PodGroupName: nil},
|
||||
},
|
||||
}
|
||||
_ = indexer.Add(pod3)
|
||||
|
||||
// Terminated pod should not appear in the index.
|
||||
pod4 := terminatedPod(podForPG("pod-4", "pg-a"), v1.PodSucceeded)
|
||||
_ = indexer.Add(pod4)
|
||||
|
||||
objs, err := indexer.ByIndex(activePodSchedulingGroupIndex, defaultNS+"/pg-a")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(objs) != 1 {
|
||||
t.Fatalf("expected 1 active pod for pg-a, got %d", len(objs))
|
||||
}
|
||||
if objs[0].(*v1.Pod).Name != "pod-1" {
|
||||
t.Errorf("expected pod-1, got %s", objs[0].(*v1.Pod).Name)
|
||||
}
|
||||
|
||||
// Nonexistent PodGroup should return empty.
|
||||
objs, err = indexer.ByIndex(activePodSchedulingGroupIndex, defaultNS+"/nonexistent")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(objs) != 0 {
|
||||
t.Errorf("expected 0 pods for nonexistent PodGroup, got %d", len(objs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestHasActivePods(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
pods []runtime.Object
|
||||
want bool
|
||||
}{
|
||||
"no pods": {
|
||||
pods: nil,
|
||||
want: false,
|
||||
},
|
||||
"active pod referencing PodGroup": {
|
||||
pods: []runtime.Object{
|
||||
podForPG("pod-1", defaultPGName),
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
"only terminated pods": {
|
||||
pods: []runtime.Object{
|
||||
terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded),
|
||||
terminatedPod(podForPG("pod-2", defaultPGName), v1.PodFailed),
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
"mix of active and terminated": {
|
||||
pods: []runtime.Object{
|
||||
podForPG("pod-active", defaultPGName),
|
||||
terminatedPod(podForPG("pod-done", defaultPGName), v1.PodSucceeded),
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
"pods referencing different PodGroup": {
|
||||
pods: []runtime.Object{
|
||||
podForPG("pod-1", "other-pg"),
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||
if err := addActivePodSchedulingGroupIndexer(indexer); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
for _, obj := range tc.pods {
|
||||
_ = indexer.Add(obj)
|
||||
}
|
||||
|
||||
ctrl := &Controller{
|
||||
podIndexer: indexer,
|
||||
}
|
||||
pg := podGroup()
|
||||
got, err := ctrl.hasActivePods(context.Background(), pg)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got != tc.want {
|
||||
t.Errorf("hasActivePods() = %v, want %v", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlePodGroupUpdate(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
tests := map[string]struct {
|
||||
pg *schedulingv1alpha2.PodGroup
|
||||
wantSize int
|
||||
}{
|
||||
"PodGroup without finalizer, not deleting/not enqueued": {
|
||||
pg: podGroup(),
|
||||
wantSize: 0,
|
||||
},
|
||||
"PodGroup is deletion candidate -> enqueued": {
|
||||
pg: deletedPodGroup(withFinalizer(podGroup())),
|
||||
wantSize: 1,
|
||||
},
|
||||
"PodGroup has finalizer, not deleting -> not enqueued": {
|
||||
pg: withFinalizer(podGroup()),
|
||||
wantSize: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
c := &Controller{
|
||||
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
|
||||
}
|
||||
defer c.queue.ShutDown()
|
||||
c.handlePodGroupUpdate(logger, tc.pg)
|
||||
if c.queue.Len() != tc.wantSize {
|
||||
t.Errorf("queue size = %d, want %d", c.queue.Len(), tc.wantSize)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -34,8 +34,8 @@ import (
|
|||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/component-helpers/storage/ephemeral"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/util/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/common"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/controller/util/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/controller/util/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/utils/ptr"
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ import (
|
|||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
|
||||
"k8s.io/kubernetes/pkg/controller/util/protectionutil"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/utils/dump"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import (
|
|||
"k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"
|
||||
podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/runtimeclass"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/scheduling/podgroupprotection"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/storage/storageclass/setdefault"
|
||||
|
|
@ -40,6 +41,7 @@ var intentionallyOffPlugins = sets.New[string](
|
|||
setdefault.PluginName, // DefaultStorageClass
|
||||
resize.PluginName, // PersistentVolumeClaimResize
|
||||
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
|
||||
podgroupprotection.PluginName, // PodGroupProtection
|
||||
podpriority.PluginName, // Priority
|
||||
nodetaint.PluginName, // TaintNodesByCondition
|
||||
runtimeclass.PluginName, // RuntimeClass
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ import (
|
|||
"k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"
|
||||
podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/runtimeclass"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/scheduling/podgroupprotection"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
|
||||
|
|
@ -89,6 +90,7 @@ var AllOrderedPlugins = []string{
|
|||
extendedresourcetoleration.PluginName, // ExtendedResourceToleration
|
||||
setdefault.PluginName, // DefaultStorageClass
|
||||
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
|
||||
podgroupprotection.PluginName, // PodGroupProtection
|
||||
gc.PluginName, // OwnerReferencesPermissionEnforcement
|
||||
resize.PluginName, // PersistentVolumeClaimResize
|
||||
runtimeclass.PluginName, // RuntimeClass
|
||||
|
|
@ -148,6 +150,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
|
|||
setdefault.Register(plugins)
|
||||
resize.Register(plugins)
|
||||
storageobjectinuseprotection.Register(plugins)
|
||||
podgroupprotection.Register(plugins)
|
||||
certapproval.Register(plugins)
|
||||
certsigning.Register(plugins)
|
||||
ctbattest.Register(plugins)
|
||||
|
|
@ -170,6 +173,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] {
|
|||
validatingwebhook.PluginName, // ValidatingAdmissionWebhook
|
||||
resourcequota.PluginName, // ResourceQuota
|
||||
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
|
||||
podgroupprotection.PluginName, // PodGroupProtection
|
||||
podpriority.PluginName, // Priority
|
||||
nodetaint.PluginName, // TaintNodesByCondition
|
||||
runtimeclass.PluginName, // RuntimeClass
|
||||
|
|
|
|||
101
plugin/pkg/admission/scheduling/podgroupprotection/admission.go
Normal file
101
plugin/pkg/admission/scheduling/podgroupprotection/admission.go
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podgroupprotection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"slices"
|
||||
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
apiserveradmission "k8s.io/apiserver/pkg/admission/initializer"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/klog/v2"
|
||||
schedulingapi "k8s.io/kubernetes/pkg/apis/scheduling"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
const (
|
||||
PluginName = "PodGroupProtection"
|
||||
)
|
||||
|
||||
// Register registers the plugin.
|
||||
func Register(plugins *admission.Plugins) {
|
||||
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
|
||||
return newPlugin(), nil
|
||||
})
|
||||
}
|
||||
|
||||
type podGroupProtectionPlugin struct {
|
||||
*admission.Handler
|
||||
enabled bool
|
||||
inspectedFeatureGates bool
|
||||
}
|
||||
|
||||
var _ admission.MutationInterface = &podGroupProtectionPlugin{}
|
||||
var _ apiserveradmission.WantsFeatures = &podGroupProtectionPlugin{}
|
||||
|
||||
func newPlugin() *podGroupProtectionPlugin {
|
||||
return &podGroupProtectionPlugin{
|
||||
Handler: admission.NewHandler(admission.Create),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *podGroupProtectionPlugin) InspectFeatureGates(featureGates featuregate.FeatureGate) {
|
||||
p.enabled = featureGates.Enabled(features.GenericWorkload)
|
||||
p.inspectedFeatureGates = true
|
||||
}
|
||||
|
||||
func (p *podGroupProtectionPlugin) ValidateInitialization() error {
|
||||
if !p.inspectedFeatureGates {
|
||||
return fmt.Errorf("feature gates not inspected")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var podGroupResource = schedulingapi.Resource("podgroups")
|
||||
|
||||
// Admit stamps the PodGroupProtectionFinalizer on every newly created PodGroup
|
||||
// so that it cannot be deleted while pods still reference it.
|
||||
// The finalizer is removed by the PodGroupProtection controller when the
|
||||
// PodGroup is no longer in use.
|
||||
func (p *podGroupProtectionPlugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
|
||||
if !p.enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
if a.GetResource().GroupResource() != podGroupResource {
|
||||
return nil
|
||||
}
|
||||
if len(a.GetSubresource()) != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
pg, ok := a.GetObject().(*schedulingapi.PodGroup)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if slices.Contains(pg.Finalizers, schedulingapi.PodGroupProtectionFinalizer) {
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.V(4).InfoS("Adding protection finalizer to PodGroup", "podGroup", klog.KObj(pg))
|
||||
pg.Finalizers = append(pg.Finalizers, schedulingapi.PodGroupProtectionFinalizer)
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podgroupprotection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
schedulingapi "k8s.io/kubernetes/pkg/apis/scheduling"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/utils/dump"
|
||||
)
|
||||
|
||||
func TestAdmit(t *testing.T) {
|
||||
pg := &schedulingapi.PodGroup{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "PodGroup"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "my-podgroup",
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
|
||||
pgWithFinalizer := pg.DeepCopy()
|
||||
pgWithFinalizer.Finalizers = []string{schedulingapi.PodGroupProtectionFinalizer}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
enabled bool
|
||||
resource schema.GroupVersionResource
|
||||
object runtime.Object
|
||||
expectedObject runtime.Object
|
||||
namespace string
|
||||
}{
|
||||
{
|
||||
name: "podgroup create with plugin enabled, add finalizer",
|
||||
enabled: true,
|
||||
resource: schedulingapi.SchemeGroupVersion.WithResource("podgroups"),
|
||||
object: pg,
|
||||
expectedObject: pgWithFinalizer,
|
||||
namespace: pg.Namespace,
|
||||
},
|
||||
{
|
||||
name: "podgroup finalizer already exists, no new finalizer",
|
||||
enabled: true,
|
||||
resource: schedulingapi.SchemeGroupVersion.WithResource("podgroups"),
|
||||
object: pgWithFinalizer,
|
||||
expectedObject: pgWithFinalizer,
|
||||
namespace: pgWithFinalizer.Namespace,
|
||||
},
|
||||
{
|
||||
name: "podgroup create with plugin disabled, no finalizer added",
|
||||
enabled: false,
|
||||
resource: schedulingapi.SchemeGroupVersion.WithResource("podgroups"),
|
||||
object: pg,
|
||||
expectedObject: pg,
|
||||
namespace: pg.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, test.enabled)
|
||||
|
||||
ctrl := newPlugin()
|
||||
ctrl.InspectFeatureGates(utilfeature.DefaultFeatureGate)
|
||||
|
||||
obj := test.object.DeepCopyObject()
|
||||
attrs := admission.NewAttributesRecord(
|
||||
obj,
|
||||
obj.DeepCopyObject(),
|
||||
schema.GroupVersionKind{},
|
||||
test.namespace,
|
||||
"foo",
|
||||
test.resource,
|
||||
"",
|
||||
admission.Create,
|
||||
&metav1.CreateOptions{},
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
|
||||
if err := ctrl.Admit(context.TODO(), attrs, nil); err != nil {
|
||||
t.Errorf("got unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expectedObject, obj) {
|
||||
t.Errorf("Expected object:\n%s\ngot:\n%s", dump.Pretty(test.expectedObject), dump.Pretty(obj))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -466,6 +466,16 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
|
|||
})
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "podgroup-protection-controller"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("get", "list", "watch", "update").Groups(schedulingGroup).Resources("podgroups").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-after-finished-controller"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
|
|
|
|||
|
|
@ -21,8 +21,12 @@ import (
|
|||
"slices"
|
||||
"testing"
|
||||
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
// rolesWithAllowStar are the controller roles which are allowed to contain a *. These are
|
||||
|
|
@ -38,7 +42,7 @@ var rolesWithAllowStar = sets.NewString(
|
|||
)
|
||||
|
||||
// TestNoStarsForControllers confirms that no controller role has star verbs, groups,
|
||||
// or resources. There are three known exceptions: namespace lifecycle and GC which have to
|
||||
// or resources. There are three known exceptions: namespace lifecycle and GC which have to
|
||||
// delete anything, and HPA, which has the power to read metrics associated
|
||||
// with any object.
|
||||
func TestNoStarsForControllers(t *testing.T) {
|
||||
|
|
@ -93,6 +97,80 @@ func TestControllerRoleLabel(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPodGroupProtectionControllerRBAC(t *testing.T) {
|
||||
roleName := saRolePrefix + "podgroup-protection-controller"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
enableFeatureGate bool
|
||||
expectRole bool
|
||||
}{
|
||||
{
|
||||
name: "role and binding absent when GenericWorkload is disabled",
|
||||
enableFeatureGate: false,
|
||||
expectRole: false,
|
||||
},
|
||||
{
|
||||
name: "role and binding present when GenericWorkload is enabled",
|
||||
enableFeatureGate: true,
|
||||
expectRole: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, test.enableFeatureGate)
|
||||
|
||||
var foundRole *rbacv1.ClusterRole
|
||||
for i, role := range ControllerRoles() {
|
||||
if role.Name == roleName {
|
||||
foundRole = &ControllerRoles()[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !test.expectRole {
|
||||
if foundRole != nil {
|
||||
t.Fatalf("role %q should not exist when GenericWorkload is disabled", roleName)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if foundRole == nil {
|
||||
t.Fatalf("role %q not found when GenericWorkload is enabled", roleName)
|
||||
}
|
||||
|
||||
wantRules := []rbacv1.PolicyRule{
|
||||
{Verbs: []string{"get", "list", "update", "watch"}, APIGroups: []string{"scheduling.k8s.io"}, Resources: []string{"podgroups"}},
|
||||
{Verbs: []string{"get", "list", "watch"}, APIGroups: []string{""}, Resources: []string{"pods"}},
|
||||
}
|
||||
if !reflect.DeepEqual(foundRole.Rules, wantRules) {
|
||||
t.Errorf("unexpected rules:\ngot: %+v\nwant: %+v", foundRole.Rules, wantRules)
|
||||
}
|
||||
|
||||
var foundBinding *rbacv1.ClusterRoleBinding
|
||||
for i, binding := range ControllerRoleBindings() {
|
||||
if binding.RoleRef.Name == roleName {
|
||||
foundBinding = &ControllerRoleBindings()[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if foundBinding == nil {
|
||||
t.Fatalf("binding for %q not found", roleName)
|
||||
}
|
||||
if len(foundBinding.Subjects) != 1 {
|
||||
t.Fatalf("expected 1 subject, got %d", len(foundBinding.Subjects))
|
||||
}
|
||||
if got, want := foundBinding.Subjects[0].Name, "podgroup-protection-controller"; got != want {
|
||||
t.Errorf("subject name = %q, want %q", got, want)
|
||||
}
|
||||
if got, want := foundBinding.Subjects[0].Namespace, "kube-system"; got != want {
|
||||
t.Errorf("subject namespace = %q, want %q", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerRoleVerbsConsistency(t *testing.T) {
|
||||
roles := ControllerRoles()
|
||||
for _, role := range roles {
|
||||
|
|
|
|||
Loading…
Reference in a new issue