diff --git a/cmd/kube-controller-manager/app/controller_descriptor.go b/cmd/kube-controller-manager/app/controller_descriptor.go index fa82992f817..21ecc82faed 100644 --- a/cmd/kube-controller-manager/app/controller_descriptor.go +++ b/cmd/kube-controller-manager/app/controller_descriptor.go @@ -194,6 +194,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor { register(newGarbageCollectorControllerDescriptor()) register(newDaemonSetControllerDescriptor()) register(newJobControllerDescriptor()) + register(newPodGroupProtectionControllerDescriptor()) register(newDeploymentControllerDescriptor()) register(newReplicaSetControllerDescriptor()) register(newHorizontalPodAutoscalerControllerDescriptor()) diff --git a/cmd/kube-controller-manager/app/controllermanager_test.go b/cmd/kube-controller-manager/app/controllermanager_test.go index f78f393187a..f31c988a0b2 100644 --- a/cmd/kube-controller-manager/app/controllermanager_test.go +++ b/cmd/kube-controller-manager/app/controllermanager_test.go @@ -64,6 +64,7 @@ func TestControllerNamesDeclaration(t *testing.T) { names.GarbageCollectorController, names.DaemonSetController, names.JobController, + names.PodGroupProtectionController, names.DeploymentController, names.ReplicaSetController, names.HorizontalPodAutoscalerController, diff --git a/cmd/kube-controller-manager/app/scheduling.go b/cmd/kube-controller-manager/app/scheduling.go new file mode 100644 index 00000000000..919e7301da0 --- /dev/null +++ b/cmd/kube-controller-manager/app/scheduling.go @@ -0,0 +1,64 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/component-base/featuregate" + "k8s.io/klog/v2" + + "k8s.io/kubernetes/cmd/kube-controller-manager/names" + "k8s.io/kubernetes/pkg/controller/scheduling/podgroupprotection" + "k8s.io/kubernetes/pkg/features" +) + +func newPodGroupProtectionControllerDescriptor() *ControllerDescriptor { + return &ControllerDescriptor{ + name: names.PodGroupProtectionController, + constructor: newPodGroupProtectionController, + requiredFeatureGates: []featuregate.Feature{ + features.GenericWorkload, + }, + } +} + +func newPodGroupProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) { + return nil, nil + } + client, err := controllerContext.NewClient(controllerName) + if err != nil { + return nil, err + } + + pgProtectionController, err := podgroupprotection.NewPodGroupProtectionController( + klog.FromContext(ctx), + controllerContext.InformerFactory.Scheduling().V1alpha2().PodGroups(), + controllerContext.InformerFactory.Core().V1().Pods(), + client, + ) + if err != nil { + return nil, fmt.Errorf("failed to init %s: %w", controllerName, err) + } + + return newControllerLoop(func(ctx context.Context) { + pgProtectionController.Run(ctx, 1) + }, controllerName), nil +} diff --git a/cmd/kube-controller-manager/app/scheduling_test.go b/cmd/kube-controller-manager/app/scheduling_test.go new file mode 100644 index 00000000000..44be0a46419 --- /dev/null +++ b/cmd/kube-controller-manager/app/scheduling_test.go @@ -0,0 +1,100 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "testing" + "time" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" + + "k8s.io/kubernetes/cmd/kube-controller-manager/names" + "k8s.io/kubernetes/pkg/features" +) + +func TestPodGroupProtectionController(t *testing.T) { + tests := []struct { + name string + enableFeatureGate bool + expectController bool + }{ + { + name: "controller runs when GenericWorkload is enabled", + enableFeatureGate: true, + expectController: true, + }, + { + name: "controller is disabled when GenericWorkload is disabled", + enableFeatureGate: false, + expectController: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, test.enableFeatureGate) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + clientset := fakeclientset.NewClientset() + controllerCtx := ControllerContext{ + ClientBuilder: TestClientBuilder{clientset: clientset}, + InformerFactory: informers.NewSharedInformerFactoryWithOptions(clientset, time.Duration(1)), + } + controllerCtx.ComponentConfig.Generic.Controllers = []string{names.PodGroupProtectionController} + + // Verify the constructor produces the expected result. + ctrl, err := newPodGroupProtectionController(ctx, controllerCtx, names.PodGroupProtectionController) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if test.expectController && ctrl == nil { + t.Errorf("expected a controller, got nil") + } else if !test.expectController && ctrl != nil { + t.Errorf("expected no controller, got %v", ctrl) + } + + // Verify the descriptor gates the controller correctly. + initFuncCalled := false + descriptor := NewControllerDescriptors()[names.PodGroupProtectionController] + descriptor.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + initFuncCalled = true + return newControllerLoop(func(ctx context.Context) {}, controllerName), nil + } + + var healthChecks mockHealthCheckAdder + if err := runControllers(ctx, controllerCtx, map[string]*ControllerDescriptor{ + names.PodGroupProtectionController: descriptor, + }, &healthChecks); err != nil { + t.Fatalf("unexpected error starting controller: %v", err) + } + if test.expectController != initFuncCalled { + t.Errorf("constructor call: expected=%v, got=%v", test.expectController, initFuncCalled) + } + hasHealthCheck := len(healthChecks.Checks) > 0 + if test.expectController != hasHealthCheck { + t.Errorf("health check: expected=%v, got=%v", test.expectController, hasHealthCheck) + } + }) + } +} diff --git a/cmd/kube-controller-manager/names/controller_names.go b/cmd/kube-controller-manager/names/controller_names.go index baaa97c076b..568087ab30e 100644 --- a/cmd/kube-controller-manager/names/controller_names.go +++ b/cmd/kube-controller-manager/names/controller_names.go @@ -76,6 +76,7 @@ const ( ClusterRoleAggregationController = "clusterrole-aggregation-controller" PersistentVolumeClaimProtectionController = "persistentvolumeclaim-protection-controller" PersistentVolumeProtectionController = "persistentvolume-protection-controller" + PodGroupProtectionController = "podgroup-protection-controller" TTLAfterFinishedController = "ttl-after-finished-controller" RootCACertificatePublisherController = "root-ca-certificate-publisher-controller" KubeAPIServerClusterTrustBundlePublisherController = "kube-apiserver-serving-clustertrustbundle-publisher-controller" diff --git a/pkg/apis/scheduling/types.go b/pkg/apis/scheduling/types.go index a700f42342f..e8943cd6519 100644 --- a/pkg/apis/scheduling/types.go +++ b/pkg/apis/scheduling/types.go @@ -39,6 +39,10 @@ const ( SystemClusterCritical = SystemPriorityClassPrefix + "cluster-critical" // SystemNodeCritical is the system priority class name that represents node-critical. SystemNodeCritical = SystemPriorityClassPrefix + "node-critical" + + // PodGroupProtectionFinalizer is the finalizer added to PodGroups to prevent + // premature deletion while pods still reference them. + PodGroupProtectionFinalizer = GroupName + "/podgroup-protection" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/scheduling/podgroupprotection/podgroup_protection_controller.go b/pkg/controller/scheduling/podgroupprotection/podgroup_protection_controller.go new file mode 100644 index 00000000000..c7eadb7426f --- /dev/null +++ b/pkg/controller/scheduling/podgroupprotection/podgroup_protection_controller.go @@ -0,0 +1,339 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroupprotection + +import ( + "context" + "fmt" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + schedulingv1alpha2 "k8s.io/api/scheduling/v1alpha2" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + schedulinginformers "k8s.io/client-go/informers/scheduling/v1alpha2" + clientset "k8s.io/client-go/kubernetes" + schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha2" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/controller/util/protectionutil" + "k8s.io/kubernetes/pkg/util/slice" +) + +const ( + // The index name for looking up active pods by their + // schedulingGroup.podGroupName field. + activePodSchedulingGroupIndex = "activePodSchedulingGroup" +) + +// Controller manages the PodGroupProtectionFinalizer on PodGroup objects. +// The finalizer is stamped at creation time by the PodGroupProtection admission +// plugin; this controller removes it when the PodGroup is being deleted and no +// active (non-terminated) pods still reference it. +type Controller struct { + kubeClient clientset.Interface + + podGroupLister schedulinglisters.PodGroupLister + podGroupSynced cache.InformerSynced + + podSynced cache.InformerSynced + + // podIndexer has the common Pod indexer installed to + // limit iteration over pods to those of interest. + podIndexer cache.Indexer + + queue workqueue.TypedRateLimitingInterface[string] +} + +// NewPodGroupProtectionController returns a new instance of the PodGroup protection controller. +func NewPodGroupProtectionController( + logger klog.Logger, + podGroupInformer schedulinginformers.PodGroupInformer, + podInformer coreinformers.PodInformer, + kubeClient clientset.Interface, +) (*Controller, error) { + c := &Controller{ + kubeClient: kubeClient, + podGroupLister: podGroupInformer.Lister(), + podGroupSynced: podGroupInformer.Informer().HasSynced, + podIndexer: podInformer.Informer().GetIndexer(), + podSynced: podInformer.Informer().HasSynced, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{Name: "podgroupprotection"}, + ), + } + + if _, err := podGroupInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.handlePodGroupUpdate(logger, obj) + }, + UpdateFunc: func(old, new interface{}) { + c.handlePodGroupUpdate(logger, new) + }, + }, cache.HandlerOptions{Logger: &logger}); err != nil { + return nil, err + } + + if err := addActivePodSchedulingGroupIndexer(c.podIndexer); err != nil { + return nil, fmt.Errorf("could not initialize PodGroup protection controller: %w", err) + } + + if _, err := podInformer.Informer().AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.handlePodChange(logger, nil, obj) + }, + DeleteFunc: func(obj interface{}) { + c.handlePodChange(logger, obj, nil) + }, + UpdateFunc: func(old, new interface{}) { + c.handlePodChange(logger, old, new) + }, + }, cache.HandlerOptions{Logger: &logger}); err != nil { + return nil, err + } + + return c, nil +} + +// addActivePodSchedulingGroupIndexer adds an indexer to look up active +// pods by their schedulingGroup.podGroupName field so we can efficiently +// determine whether a PodGroup still has active pods. +func addActivePodSchedulingGroupIndexer(indexer cache.Indexer) error { + return indexer.AddIndexers(cache.Indexers{ + activePodSchedulingGroupIndex: func(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, nil + } + if isPodTerminated(pod) { + return nil, nil + } + if pod.Spec.SchedulingGroup == nil || pod.Spec.SchedulingGroup.PodGroupName == nil { + return nil, nil + } + return []string{pod.Namespace + "/" + *pod.Spec.SchedulingGroup.PodGroupName}, nil + }, + }) +} + +// Run runs the controller goroutines. +func (c *Controller) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + + logger := klog.FromContext(ctx) + logger.Info("Starting PodGroup protection controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down PodGroup protection controller") + c.queue.ShutDown() + wg.Wait() + }() + + if !cache.WaitForNamedCacheSyncWithContext(ctx, c.podGroupSynced, c.podSynced) { + return + } + + for range workers { + wg.Go(func() { + wait.UntilWithContext(ctx, c.runWorker, time.Second) + }) + } + <-ctx.Done() +} + +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + pgKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(pgKey) + + err := c.processPodGroup(ctx, pgKey) + if err == nil { + c.queue.Forget(pgKey) + return true + } + + c.queue.AddRateLimited(pgKey) + utilruntime.HandleError(fmt.Errorf("PodGroup %v failed with: %w", pgKey, err)) + + return true +} + +func (c *Controller) processPodGroup(ctx context.Context, pgKey string) error { + logger := klog.FromContext(ctx) + logger.V(4).Info("Processing PodGroup", "podGroup", pgKey) + + pgNamespace, pgName, err := cache.SplitMetaNamespaceKey(pgKey) + if err != nil { + return fmt.Errorf("error parsing PodGroup key %q: %w", pgKey, err) + } + + pg, err := c.podGroupLister.PodGroups(pgNamespace).Get(pgName) + if apierrors.IsNotFound(err) { + logger.V(4).Info("PodGroup not found, ignoring", "podGroup", pgKey) + return nil + } + if err != nil { + return err + } + + if !protectionutil.IsDeletionCandidate(pg, scheduling.PodGroupProtectionFinalizer) { + return nil + } + + isUsed, err := c.hasActivePods(ctx, pg) + if err != nil { + return err + } + if !isUsed { + return c.removeFinalizer(ctx, pg) + } + logger.V(4).Info("Keeping PodGroup finalizer because it is still being used by pods", "podGroup", klog.KObj(pg)) + return nil +} + +func (c *Controller) removeFinalizer(ctx context.Context, pg *schedulingv1alpha2.PodGroup) error { + logger := klog.FromContext(ctx) + pgClone := pg.DeepCopy() + + pgClone.Finalizers = slice.RemoveString(pgClone.Finalizers, scheduling.PodGroupProtectionFinalizer, nil) + _, err := c.kubeClient.SchedulingV1alpha2().PodGroups(pgClone.Namespace).Update(ctx, pgClone, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "Error removing protection finalizer from PodGroup", "podGroup", klog.KObj(pg)) + return err + } + + logger.V(3).Info("Removed protection finalizer from PodGroup", "podGroup", klog.KObj(pg)) + return nil +} + +// hasActivePods returns true if any active pods reference the PodGroup +// via spec.schedulingGroup.podGroupName. The index only contains +// non-terminated pods, so a non-empty result means the PodGroup is still in use. +func (c *Controller) hasActivePods(ctx context.Context, pg *schedulingv1alpha2.PodGroup) (bool, error) { + logger := klog.FromContext(ctx) + indexKey := pg.Namespace + "/" + pg.Name + + objs, err := c.podIndexer.ByIndex(activePodSchedulingGroupIndex, indexKey) + if err != nil { + return false, fmt.Errorf("index-based list of active pods failed for PodGroup %s: %w", indexKey, err) + } + + if len(objs) > 0 { + logger.V(4).Info("Pod is using PodGroup", "pod", klog.KObj(objs[0].(*v1.Pod)), "podGroup", klog.KObj(pg)) + return true, nil + } + + logger.V(4).Info("No active pods found using PodGroup", "podGroup", klog.KObj(pg)) + return false, nil +} + +// isPodTerminated returns true if the pod has completed (Succeeded or Failed). +func isPodTerminated(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed +} + +// handlePodGroupUpdate handles PodGroup add/update events. +// Only deletion candidates which are being deleted and have the finalizer need processing. +func (c *Controller) handlePodGroupUpdate(logger klog.Logger, obj interface{}) { + pg, ok := obj.(*schedulingv1alpha2.PodGroup) + if !ok { + utilruntime.HandleError(fmt.Errorf("PodGroup informer returned non-PodGroup object: %#v", obj)) + return + } + if !protectionutil.IsDeletionCandidate(pg, scheduling.PodGroupProtectionFinalizer) { + return + } + key, err := cache.MetaNamespaceKeyFunc(pg) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for PodGroup %#v: %w", pg, err)) + return + } + logger.V(4).Info("Got event on PodGroup", "podGroup", klog.KObj(pg)) + c.queue.Add(key) +} + +// handlePodChange handles Pod add/delete/update events. +// It enqueues the referenced PodGroup only when the event could affect +// finalizer decisions where the pod is deleted or transitioned to a terminal phase. +func (c *Controller) handlePodChange(logger klog.Logger, old, new interface{}) { + newPod := getPod(new) + oldPod := getPod(old) + + if newPod != nil && isPodTerminated(newPod) { + c.enqueuePodGroupForPod(logger, newPod) + } + + // An update notification might mask the deletion of a pod X and the + // following creation of a pod Y with the same namespaced name as X. If + // that's the case, X needs to be processed as well to handle the case + // where it was the last active pod keeping the finalizer on a PodGroup. + if newPod != nil && oldPod != nil && oldPod.UID != newPod.UID { + c.enqueuePodGroupForPod(logger, oldPod) + } + + if newPod == nil && oldPod != nil { + c.enqueuePodGroupForPod(logger, oldPod) + } +} + +// enqueuePodGroupForPod enqueues the PodGroup referenced by the pod. +// Callers are responsible for only passing pods whose state change could allow +// finalizer removal (deleted or transitioned to a terminal phase). +func (c *Controller) enqueuePodGroupForPod(logger klog.Logger, pod *v1.Pod) { + if pod.Spec.SchedulingGroup == nil || pod.Spec.SchedulingGroup.PodGroupName == nil { + return + } + + pgKey := pod.Namespace + "/" + *pod.Spec.SchedulingGroup.PodGroupName + logger.V(4).Info("Enqueuing PodGroup for pod event", "pod", klog.KObj(pod), "podGroup", pgKey) + c.queue.Add(pgKey) +} + +func getPod(obj interface{}) *v1.Pod { + if obj == nil { + return nil + } + pod, ok := obj.(*v1.Pod) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return nil + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod %#v", obj)) + return nil + } + } + return pod +} diff --git a/pkg/controller/scheduling/podgroupprotection/podgroup_protection_controller_test.go b/pkg/controller/scheduling/podgroupprotection/podgroup_protection_controller_test.go new file mode 100644 index 00000000000..e727a986a99 --- /dev/null +++ b/pkg/controller/scheduling/podgroupprotection/podgroup_protection_controller_test.go @@ -0,0 +1,551 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroupprotection + +import ( + "context" + "slices" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + schedulingv1alpha2 "k8s.io/api/scheduling/v1alpha2" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/utils/ptr" +) + +const ( + defaultNS = "default" + defaultPGName = "my-podgroup" + defaultPGUID = "pg-uid-1" +) + +func podGroup() *schedulingv1alpha2.PodGroup { + return &schedulingv1alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultPGName, + Namespace: defaultNS, + UID: defaultPGUID, + }, + } +} + +func withFinalizer(pg *schedulingv1alpha2.PodGroup) *schedulingv1alpha2.PodGroup { + pg.Finalizers = append(pg.Finalizers, scheduling.PodGroupProtectionFinalizer) + return pg +} + +func deletedPodGroup(pg *schedulingv1alpha2.PodGroup) *schedulingv1alpha2.PodGroup { + pg.DeletionTimestamp = &metav1.Time{} + return pg +} + +func podForPG(name string, pgName string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNS, + UID: types.UID(name + "-uid"), + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + SchedulingGroup: &v1.PodSchedulingGroup{ + PodGroupName: ptr.To(pgName), + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } +} + +func terminatedPod(pod *v1.Pod, phase v1.PodPhase) *v1.Pod { + pod.Status.Phase = phase + return pod +} + +func unscheduledPod(pod *v1.Pod) *v1.Pod { + pod.Spec.NodeName = "" + return pod +} + +func podWithoutSchedulingGroup(name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNS, + UID: types.UID(name + "-uid"), + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } +} + +func TestIsPodTerminated(t *testing.T) { + tests := map[string]struct { + phase v1.PodPhase + want bool + }{ + "running": {phase: v1.PodRunning, want: false}, + "pending": {phase: v1.PodPending, want: false}, + "succeeded": {phase: v1.PodSucceeded, want: true}, + "failed": {phase: v1.PodFailed, want: true}, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + pod := &v1.Pod{Status: v1.PodStatus{Phase: tc.phase}} + if got := isPodTerminated(pod); got != tc.want { + t.Errorf("isPodTerminated(%v) = %v, want %v", tc.phase, got, tc.want) + } + }) + } +} + +func TestGetPod(t *testing.T) { + tests := map[string]struct { + obj interface{} + want bool + }{ + "nil": { + obj: nil, + want: false, + }, + "pod": { + obj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, + want: true, + }, + "tombstone with pod": { + obj: cache.DeletedFinalStateUnknown{ + Key: "default/p", + Obj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, + }, + want: true, + }, + "tombstone with non-pod": { + obj: cache.DeletedFinalStateUnknown{ + Key: "default/p", + Obj: &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, + }, + want: false, + }, + "non-pod object": { + obj: &v1.ConfigMap{}, + want: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got := getPod(tc.obj) + if (got != nil) != tc.want { + t.Errorf("parsePod() returned pod=%v, want non-nil=%v", got, tc.want) + } + }) + } +} + +func TestHandlePodChange(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + + tests := map[string]struct { + old interface{} + new interface{} + wantSize int + }{ + "deleted pod with schedulingGroup enqueues": { + old: podForPG("pod-1", defaultPGName), + new: nil, + wantSize: 1, + }, + "terminated pod enqueues": { + old: podForPG("pod-1", defaultPGName), + new: terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded), + wantSize: 1, + }, + "unscheduled new pod does not enqueue": { + old: nil, + new: unscheduledPod(podForPG("pod-1", defaultPGName)), + wantSize: 0, + }, + "scheduled running pod does not enqueue": { + old: nil, + new: podForPG("pod-1", defaultPGName), + wantSize: 0, + }, + "deleted pod without schedulingGroup does not enqueue": { + old: podWithoutSchedulingGroup("pod-1"), + new: nil, + wantSize: 0, + }, + "deleted pod with nil podGroupName does not enqueue": { + old: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: defaultNS}, + Spec: v1.PodSpec{ + SchedulingGroup: &v1.PodSchedulingGroup{PodGroupName: nil}, + }, + }, + new: nil, + wantSize: 0, + }, + "UID mismatch with terminated new pod, same PodGroup, deduplicates enqueue": { + old: func() *v1.Pod { + p := podForPG("pod-1", defaultPGName) + p.UID = "old-uid" + return p + }(), + new: func() interface{} { + p := terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded) + p.UID = "new-uid" + return p + }(), + wantSize: 1, + }, + "UID mismatch with terminated new pod referencing different PodGroup enqueues both": { + old: func() *v1.Pod { + p := podForPG("pod-1", defaultPGName) + p.UID = "old-uid" + return p + }(), + new: func() interface{} { + p := terminatedPod(podForPG("pod-1", "other-pg"), v1.PodFailed) + p.UID = "new-uid" + return p + }(), + wantSize: 2, + }, + "UID mismatch on update with non-terminated new pod enqueues only old pod": { + old: func() *v1.Pod { + p := podForPG("pod-1", defaultPGName) + p.UID = "old-uid" + return p + }(), + new: func() interface{} { + p := unscheduledPod(podForPG("pod-1", defaultPGName)) + p.UID = "new-uid" + return p + }(), + wantSize: 1, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + c := &Controller{ + queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + } + defer c.queue.ShutDown() + c.handlePodChange(logger, tc.old, tc.new) + + if c.queue.Len() != tc.wantSize { + t.Errorf("queue size = %d, want %d", c.queue.Len(), tc.wantSize) + } + }) + } +} + +func TestPodGroupProtectionController(t *testing.T) { + tests := []struct { + name string + // Objects to seed into the fake client before the controller starts. + initialObjects []runtime.Object + // Pod to delete (by name in defaultNS) after the controller starts. + podToDelete string + // Whether the finalizer should be present on the PodGroup after the + // controller has finished processing. + expectFinalizer bool + }{ + { + name: "new PodGroup without finalizer, no action (admission plugin handles it)", + initialObjects: []runtime.Object{podGroup()}, + expectFinalizer: false, + }, + { + name: "new PodGroup without finalizer, active pod exists, then no action (admission plugin handles it)", + initialObjects: []runtime.Object{podGroup(), podForPG("pod-1", defaultPGName)}, + expectFinalizer: false, + }, + { + name: "PodGroup with finalizer, not being deleted, then no action", + initialObjects: []runtime.Object{withFinalizer(podGroup())}, + expectFinalizer: true, + }, + { + name: "deleted PodGroup with finalizer, no active pods, then finalizer is removed", + initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup()))}, + expectFinalizer: false, + }, + { + name: "deleted PodGroup with finalizer, active pod exists, then finalizer is kept", + initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), podForPG("pod-1", defaultPGName)}, + expectFinalizer: true, + }, + { + name: "deleted PodGroup with finalizer, only terminated pods, then finalizer is removed", + initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)}, + expectFinalizer: false, + }, + { + name: "deleted PodGroup with finalizer, mix of active and terminated pods, then finalizer is kept", + initialObjects: []runtime.Object{ + deletedPodGroup(withFinalizer(podGroup())), + podForPG("pod-active", defaultPGName), + terminatedPod(podForPG("pod-done", defaultPGName), v1.PodSucceeded), + }, + expectFinalizer: true, + }, + { + name: "PodGroup without finalizer, already deleted, then no action (not a deletion candidate)", + initialObjects: []runtime.Object{deletedPodGroup(podGroup())}, + expectFinalizer: false, + }, + { + name: "PodGroup without finalizer, already deleted, active pods exist, then no action (should not add finalizer to deleting object)", + initialObjects: []runtime.Object{deletedPodGroup(podGroup()), podForPG("pod-1", defaultPGName)}, + expectFinalizer: false, + }, + { + name: "pod deleted, PodGroup being deleted with finalizer, was last active pod, finalizer is removed", + initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), podForPG("pod-1", defaultPGName)}, + podToDelete: "pod-1", + expectFinalizer: false, + }, + { + name: "pod terminated succeeded, PodGroup being deleted with finalizer, was last active pod, then finalizer is removed", + initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)}, + expectFinalizer: false, + }, + { + name: "pod terminated failed, PodGroup being deleted with finalizer, was last active pod, then finalizer is removed", + initialObjects: []runtime.Object{deletedPodGroup(withFinalizer(podGroup())), terminatedPod(podForPG("pod-1", defaultPGName), v1.PodFailed)}, + expectFinalizer: false, + }, + { + name: "new unscheduled pod, PodGroup without finalizer, then no action (admission plugin handles it)", + initialObjects: []runtime.Object{podGroup(), unscheduledPod(podForPG("pod-1", defaultPGName))}, + expectFinalizer: false, + }, + { + name: "pod without schedulingGroup -> no PodGroup action", + initialObjects: []runtime.Object{withFinalizer(podGroup()), podWithoutSchedulingGroup("pod-1")}, + expectFinalizer: true, + }, + { + name: "terminated pod references non-existent PodGroup, controller handles gracefully", + initialObjects: []runtime.Object{terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded)}, + expectFinalizer: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + client := fake.NewClientset(test.initialObjects...) + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + pgInformer := informerFactory.Scheduling().V1alpha2().PodGroups() + podInformer := informerFactory.Core().V1().Pods() + + ctrl, err := NewPodGroupProtectionController(klog.FromContext(ctx), pgInformer, podInformer, client) + if err != nil { + t.Fatalf("unexpected error creating controller: %v", err) + } + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + go ctrl.Run(ctx, 1) + + if test.podToDelete != "" { + if err := client.CoreV1().Pods(defaultNS).Delete(ctx, test.podToDelete, metav1.DeleteOptions{}); err != nil { + t.Fatalf("deleting pod: %v", err) + } + } + + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + pg, err := client.SchedulingV1alpha2().PodGroups(defaultNS).Get(ctx, defaultPGName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return !test.expectFinalizer, nil + } + if err != nil { + return false, err + } + hasFinalizer := slices.Contains(pg.Finalizers, scheduling.PodGroupProtectionFinalizer) + return hasFinalizer == test.expectFinalizer, nil + }); err != nil { + t.Fatalf("timed out waiting for expected finalizer state (want present=%v): %v", test.expectFinalizer, err) + } + }) + } +} + +func TestActivePodSchedulingGroupIndexer(t *testing.T) { + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + if err := addActivePodSchedulingGroupIndexer(indexer); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + pod1 := podForPG("pod-1", "pg-a") + _ = indexer.Add(pod1) + + pod2 := podWithoutSchedulingGroup("pod-2") + _ = indexer.Add(pod2) + + pod3 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-3", Namespace: defaultNS, UID: "pod-3-uid"}, + Spec: v1.PodSpec{ + SchedulingGroup: &v1.PodSchedulingGroup{PodGroupName: nil}, + }, + } + _ = indexer.Add(pod3) + + // Terminated pod should not appear in the index. + pod4 := terminatedPod(podForPG("pod-4", "pg-a"), v1.PodSucceeded) + _ = indexer.Add(pod4) + + objs, err := indexer.ByIndex(activePodSchedulingGroupIndex, defaultNS+"/pg-a") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(objs) != 1 { + t.Fatalf("expected 1 active pod for pg-a, got %d", len(objs)) + } + if objs[0].(*v1.Pod).Name != "pod-1" { + t.Errorf("expected pod-1, got %s", objs[0].(*v1.Pod).Name) + } + + // Nonexistent PodGroup should return empty. + objs, err = indexer.ByIndex(activePodSchedulingGroupIndex, defaultNS+"/nonexistent") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(objs) != 0 { + t.Errorf("expected 0 pods for nonexistent PodGroup, got %d", len(objs)) + } +} + +func TestHasActivePods(t *testing.T) { + tests := map[string]struct { + pods []runtime.Object + want bool + }{ + "no pods": { + pods: nil, + want: false, + }, + "active pod referencing PodGroup": { + pods: []runtime.Object{ + podForPG("pod-1", defaultPGName), + }, + want: true, + }, + "only terminated pods": { + pods: []runtime.Object{ + terminatedPod(podForPG("pod-1", defaultPGName), v1.PodSucceeded), + terminatedPod(podForPG("pod-2", defaultPGName), v1.PodFailed), + }, + want: false, + }, + "mix of active and terminated": { + pods: []runtime.Object{ + podForPG("pod-active", defaultPGName), + terminatedPod(podForPG("pod-done", defaultPGName), v1.PodSucceeded), + }, + want: true, + }, + "pods referencing different PodGroup": { + pods: []runtime.Object{ + podForPG("pod-1", "other-pg"), + }, + want: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + if err := addActivePodSchedulingGroupIndexer(indexer); err != nil { + t.Fatalf("unexpected error: %v", err) + } + for _, obj := range tc.pods { + _ = indexer.Add(obj) + } + + ctrl := &Controller{ + podIndexer: indexer, + } + pg := podGroup() + got, err := ctrl.hasActivePods(context.Background(), pg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tc.want { + t.Errorf("hasActivePods() = %v, want %v", got, tc.want) + } + }) + } +} + +func TestHandlePodGroupUpdate(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + + tests := map[string]struct { + pg *schedulingv1alpha2.PodGroup + wantSize int + }{ + "PodGroup without finalizer, not deleting/not enqueued": { + pg: podGroup(), + wantSize: 0, + }, + "PodGroup is deletion candidate -> enqueued": { + pg: deletedPodGroup(withFinalizer(podGroup())), + wantSize: 1, + }, + "PodGroup has finalizer, not deleting -> not enqueued": { + pg: withFinalizer(podGroup()), + wantSize: 0, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + c := &Controller{ + queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + } + defer c.queue.ShutDown() + c.handlePodGroupUpdate(logger, tc.pg) + if c.queue.Len() != tc.wantSize { + t.Errorf("queue size = %d, want %d", c.queue.Len(), tc.wantSize) + } + }) + } +} diff --git a/pkg/controller/volume/protectionutil/utils.go b/pkg/controller/util/protectionutil/utils.go similarity index 100% rename from pkg/controller/volume/protectionutil/utils.go rename to pkg/controller/util/protectionutil/utils.go diff --git a/pkg/controller/volume/protectionutil/utils_test.go b/pkg/controller/util/protectionutil/utils_test.go similarity index 100% rename from pkg/controller/volume/protectionutil/utils_test.go rename to pkg/controller/util/protectionutil/utils_test.go diff --git a/pkg/controller/volume/protectionutil/wrappers.go b/pkg/controller/util/protectionutil/wrappers.go similarity index 100% rename from pkg/controller/volume/protectionutil/wrappers.go rename to pkg/controller/util/protectionutil/wrappers.go diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index 526016d1dc0..e914da7699f 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -34,8 +34,8 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/component-helpers/storage/ephemeral" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller/util/protectionutil" "k8s.io/kubernetes/pkg/controller/volume/common" - "k8s.io/kubernetes/pkg/controller/volume/protectionutil" "k8s.io/kubernetes/pkg/util/slice" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) diff --git a/pkg/controller/volume/pvprotection/pv_protection_controller.go b/pkg/controller/volume/pvprotection/pv_protection_controller.go index 1b748226d04..161ef7a370b 100644 --- a/pkg/controller/volume/pvprotection/pv_protection_controller.go +++ b/pkg/controller/volume/pvprotection/pv_protection_controller.go @@ -33,7 +33,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/controller/volume/protectionutil" + "k8s.io/kubernetes/pkg/controller/util/protectionutil" "k8s.io/kubernetes/pkg/util/slice" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) diff --git a/pkg/controller/volume/vacprotection/vac_protection_controller.go b/pkg/controller/volume/vacprotection/vac_protection_controller.go index a9ede7f5da8..10f22d9d9e2 100644 --- a/pkg/controller/volume/vacprotection/vac_protection_controller.go +++ b/pkg/controller/volume/vacprotection/vac_protection_controller.go @@ -36,7 +36,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/controller/volume/protectionutil" + "k8s.io/kubernetes/pkg/controller/util/protectionutil" "k8s.io/kubernetes/pkg/util/slice" volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/utils/ptr" diff --git a/pkg/controller/volume/vacprotection/vac_protection_controller_test.go b/pkg/controller/volume/vacprotection/vac_protection_controller_test.go index 537755801e8..93d7b827783 100644 --- a/pkg/controller/volume/vacprotection/vac_protection_controller_test.go +++ b/pkg/controller/volume/vacprotection/vac_protection_controller_test.go @@ -34,7 +34,7 @@ import ( clienttesting "k8s.io/client-go/testing" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/volume/protectionutil" + "k8s.io/kubernetes/pkg/controller/util/protectionutil" volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/utils/dump" ) diff --git a/pkg/controlplane/apiserver/samples/generic/server/admission_test.go b/pkg/controlplane/apiserver/samples/generic/server/admission_test.go index a0951a9f7cc..f2334bb2d5f 100644 --- a/pkg/controlplane/apiserver/samples/generic/server/admission_test.go +++ b/pkg/controlplane/apiserver/samples/generic/server/admission_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" + "k8s.io/kubernetes/plugin/pkg/admission/scheduling/podgroupprotection" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" "k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize" "k8s.io/kubernetes/plugin/pkg/admission/storage/storageclass/setdefault" @@ -40,6 +41,7 @@ var intentionallyOffPlugins = sets.New[string]( setdefault.PluginName, // DefaultStorageClass resize.PluginName, // PersistentVolumeClaimResize storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection + podgroupprotection.PluginName, // PodGroupProtection podpriority.PluginName, // Priority nodetaint.PluginName, // TaintNodesByCondition runtimeclass.PluginName, // RuntimeClass diff --git a/pkg/kubeapiserver/options/plugins.go b/pkg/kubeapiserver/options/plugins.go index 5f86e9a9802..d69de6c4e88 100644 --- a/pkg/kubeapiserver/options/plugins.go +++ b/pkg/kubeapiserver/options/plugins.go @@ -53,6 +53,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" + "k8s.io/kubernetes/plugin/pkg/admission/scheduling/podgroupprotection" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount" "k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize" @@ -89,6 +90,7 @@ var AllOrderedPlugins = []string{ extendedresourcetoleration.PluginName, // ExtendedResourceToleration setdefault.PluginName, // DefaultStorageClass storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection + podgroupprotection.PluginName, // PodGroupProtection gc.PluginName, // OwnerReferencesPermissionEnforcement resize.PluginName, // PersistentVolumeClaimResize runtimeclass.PluginName, // RuntimeClass @@ -148,6 +150,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) { setdefault.Register(plugins) resize.Register(plugins) storageobjectinuseprotection.Register(plugins) + podgroupprotection.Register(plugins) certapproval.Register(plugins) certsigning.Register(plugins) ctbattest.Register(plugins) @@ -170,6 +173,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { validatingwebhook.PluginName, // ValidatingAdmissionWebhook resourcequota.PluginName, // ResourceQuota storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection + podgroupprotection.PluginName, // PodGroupProtection podpriority.PluginName, // Priority nodetaint.PluginName, // TaintNodesByCondition runtimeclass.PluginName, // RuntimeClass diff --git a/plugin/pkg/admission/scheduling/podgroupprotection/admission.go b/plugin/pkg/admission/scheduling/podgroupprotection/admission.go new file mode 100644 index 00000000000..bd7a9057536 --- /dev/null +++ b/plugin/pkg/admission/scheduling/podgroupprotection/admission.go @@ -0,0 +1,101 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroupprotection + +import ( + "context" + "fmt" + "io" + "slices" + + "k8s.io/apiserver/pkg/admission" + apiserveradmission "k8s.io/apiserver/pkg/admission/initializer" + "k8s.io/component-base/featuregate" + "k8s.io/klog/v2" + schedulingapi "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/features" +) + +const ( + PluginName = "PodGroupProtection" +) + +// Register registers the plugin. +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) { + return newPlugin(), nil + }) +} + +type podGroupProtectionPlugin struct { + *admission.Handler + enabled bool + inspectedFeatureGates bool +} + +var _ admission.MutationInterface = &podGroupProtectionPlugin{} +var _ apiserveradmission.WantsFeatures = &podGroupProtectionPlugin{} + +func newPlugin() *podGroupProtectionPlugin { + return &podGroupProtectionPlugin{ + Handler: admission.NewHandler(admission.Create), + } +} + +func (p *podGroupProtectionPlugin) InspectFeatureGates(featureGates featuregate.FeatureGate) { + p.enabled = featureGates.Enabled(features.GenericWorkload) + p.inspectedFeatureGates = true +} + +func (p *podGroupProtectionPlugin) ValidateInitialization() error { + if !p.inspectedFeatureGates { + return fmt.Errorf("feature gates not inspected") + } + return nil +} + +var podGroupResource = schedulingapi.Resource("podgroups") + +// Admit stamps the PodGroupProtectionFinalizer on every newly created PodGroup +// so that it cannot be deleted while pods still reference it. +// The finalizer is removed by the PodGroupProtection controller when the +// PodGroup is no longer in use. +func (p *podGroupProtectionPlugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { + if !p.enabled { + return nil + } + + if a.GetResource().GroupResource() != podGroupResource { + return nil + } + if len(a.GetSubresource()) != 0 { + return nil + } + + pg, ok := a.GetObject().(*schedulingapi.PodGroup) + if !ok { + return nil + } + + if slices.Contains(pg.Finalizers, schedulingapi.PodGroupProtectionFinalizer) { + return nil + } + + klog.V(4).InfoS("Adding protection finalizer to PodGroup", "podGroup", klog.KObj(pg)) + pg.Finalizers = append(pg.Finalizers, schedulingapi.PodGroupProtectionFinalizer) + return nil +} diff --git a/plugin/pkg/admission/scheduling/podgroupprotection/admission_test.go b/plugin/pkg/admission/scheduling/podgroupprotection/admission_test.go new file mode 100644 index 00000000000..2837a1e0af4 --- /dev/null +++ b/plugin/pkg/admission/scheduling/podgroupprotection/admission_test.go @@ -0,0 +1,111 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroupprotection + +import ( + "context" + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/admission" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + schedulingapi "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/features" + "k8s.io/utils/dump" +) + +func TestAdmit(t *testing.T) { + pg := &schedulingapi.PodGroup{ + TypeMeta: metav1.TypeMeta{Kind: "PodGroup"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-podgroup", + Namespace: "default", + }, + } + + pgWithFinalizer := pg.DeepCopy() + pgWithFinalizer.Finalizers = []string{schedulingapi.PodGroupProtectionFinalizer} + + tests := []struct { + name string + enabled bool + resource schema.GroupVersionResource + object runtime.Object + expectedObject runtime.Object + namespace string + }{ + { + name: "podgroup create with plugin enabled, add finalizer", + enabled: true, + resource: schedulingapi.SchemeGroupVersion.WithResource("podgroups"), + object: pg, + expectedObject: pgWithFinalizer, + namespace: pg.Namespace, + }, + { + name: "podgroup finalizer already exists, no new finalizer", + enabled: true, + resource: schedulingapi.SchemeGroupVersion.WithResource("podgroups"), + object: pgWithFinalizer, + expectedObject: pgWithFinalizer, + namespace: pgWithFinalizer.Namespace, + }, + { + name: "podgroup create with plugin disabled, no finalizer added", + enabled: false, + resource: schedulingapi.SchemeGroupVersion.WithResource("podgroups"), + object: pg, + expectedObject: pg, + namespace: pg.Namespace, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, test.enabled) + + ctrl := newPlugin() + ctrl.InspectFeatureGates(utilfeature.DefaultFeatureGate) + + obj := test.object.DeepCopyObject() + attrs := admission.NewAttributesRecord( + obj, + obj.DeepCopyObject(), + schema.GroupVersionKind{}, + test.namespace, + "foo", + test.resource, + "", + admission.Create, + &metav1.CreateOptions{}, + false, + nil, + ) + + if err := ctrl.Admit(context.TODO(), attrs, nil); err != nil { + t.Errorf("got unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expectedObject, obj) { + t.Errorf("Expected object:\n%s\ngot:\n%s", dump.Pretty(test.expectedObject), dump.Pretty(obj)) + } + }) + } +} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 3a1fd11c407..47af5f7f488 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -466,6 +466,16 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) }) } + if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) { + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "podgroup-protection-controller"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("get", "list", "watch", "update").Groups(schedulingGroup).Resources("podgroups").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + }, + }) + } + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-after-finished-controller"}, Rules: []rbacv1.PolicyRule{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go index 21417d18d90..8c8a2d9746c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go @@ -21,8 +21,12 @@ import ( "slices" "testing" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" ) // rolesWithAllowStar are the controller roles which are allowed to contain a *. These are @@ -38,7 +42,7 @@ var rolesWithAllowStar = sets.NewString( ) // TestNoStarsForControllers confirms that no controller role has star verbs, groups, -// or resources. There are three known exceptions: namespace lifecycle and GC which have to +// or resources. There are three known exceptions: namespace lifecycle and GC which have to // delete anything, and HPA, which has the power to read metrics associated // with any object. func TestNoStarsForControllers(t *testing.T) { @@ -93,6 +97,80 @@ func TestControllerRoleLabel(t *testing.T) { } } +func TestPodGroupProtectionControllerRBAC(t *testing.T) { + roleName := saRolePrefix + "podgroup-protection-controller" + + tests := []struct { + name string + enableFeatureGate bool + expectRole bool + }{ + { + name: "role and binding absent when GenericWorkload is disabled", + enableFeatureGate: false, + expectRole: false, + }, + { + name: "role and binding present when GenericWorkload is enabled", + enableFeatureGate: true, + expectRole: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericWorkload, test.enableFeatureGate) + + var foundRole *rbacv1.ClusterRole + for i, role := range ControllerRoles() { + if role.Name == roleName { + foundRole = &ControllerRoles()[i] + break + } + } + + if !test.expectRole { + if foundRole != nil { + t.Fatalf("role %q should not exist when GenericWorkload is disabled", roleName) + } + return + } + + if foundRole == nil { + t.Fatalf("role %q not found when GenericWorkload is enabled", roleName) + } + + wantRules := []rbacv1.PolicyRule{ + {Verbs: []string{"get", "list", "update", "watch"}, APIGroups: []string{"scheduling.k8s.io"}, Resources: []string{"podgroups"}}, + {Verbs: []string{"get", "list", "watch"}, APIGroups: []string{""}, Resources: []string{"pods"}}, + } + if !reflect.DeepEqual(foundRole.Rules, wantRules) { + t.Errorf("unexpected rules:\ngot: %+v\nwant: %+v", foundRole.Rules, wantRules) + } + + var foundBinding *rbacv1.ClusterRoleBinding + for i, binding := range ControllerRoleBindings() { + if binding.RoleRef.Name == roleName { + foundBinding = &ControllerRoleBindings()[i] + break + } + } + if foundBinding == nil { + t.Fatalf("binding for %q not found", roleName) + } + if len(foundBinding.Subjects) != 1 { + t.Fatalf("expected 1 subject, got %d", len(foundBinding.Subjects)) + } + if got, want := foundBinding.Subjects[0].Name, "podgroup-protection-controller"; got != want { + t.Errorf("subject name = %q, want %q", got, want) + } + if got, want := foundBinding.Subjects[0].Namespace, "kube-system"; got != want { + t.Errorf("subject namespace = %q, want %q", got, want) + } + }) + } +} + func TestControllerRoleVerbsConsistency(t *testing.T) { roles := ControllerRoles() for _, role := range roles {