From d274e05cc99970462954e3a4796b2c076bd2e071 Mon Sep 17 00:00:00 2001 From: Michael Aspinwall Date: Thu, 13 Nov 2025 00:06:30 +0000 Subject: [PATCH] Remove CRD stored versions from status upon SVM migration --- .../app/storageversionmigrator.go | 12 + .../storagemigration/validation/validation.go | 28 +- .../validation/validation_test.go | 12 - .../storageversionmigrator/migrationrunner.go | 424 ++++++++++++++++ .../migrationrunner_test.go | 471 ++++++++++++++++++ .../storageversionmigrator/resourceversion.go | 5 + .../resourceversion_test.go | 60 ++- .../storageversionmigrator.go | 11 - .../storageversionmigrator_test.go | 66 +-- pkg/controller/storageversionmigrator/util.go | 8 - pkg/features/kube_features.go | 1 + .../rbac/bootstrappolicy/controller_policy.go | 2 + .../pkg/apis/apiextensions/v1/types.go | 3 + .../pkg/features/kube_features.go | 1 + .../reference/feature_list.md | 2 +- .../reference/versioned_feature_list.yaml | 4 + .../storageversionmigrator_test.go | 7 +- .../storageversionmigrator/util.go | 24 +- 18 files changed, 1012 insertions(+), 129 deletions(-) create mode 100644 pkg/controller/storageversionmigrator/migrationrunner.go create mode 100644 pkg/controller/storageversionmigrator/migrationrunner_test.go diff --git a/cmd/kube-controller-manager/app/storageversionmigrator.go b/cmd/kube-controller-manager/app/storageversionmigrator.go index 42c3ea1bef1..35eeb8eea16 100644 --- a/cmd/kube-controller-manager/app/storageversionmigrator.go +++ b/cmd/kube-controller-manager/app/storageversionmigrator.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" @@ -69,6 +70,11 @@ func newSVMController(ctx context.Context, controllerContext ControllerContext, informer := controllerContext.InformerFactory.Storagemigration().V1beta1().StorageVersionMigrations() + crdClientset, err := apiextensionsclientset.NewForConfig(config) + if err != nil { + return nil, err + } + dynamicClient, err := dynamic.NewForConfig(config) if err != nil { return nil, err @@ -102,5 +108,11 @@ func newSVMController(ctx context.Context, controllerContext ControllerContext, informer, controllerContext.RESTMapper, ).Run, + svm.NewCustomResourceController( + ctx, + client, + informer, + crdClientset.ApiextensionsV1().CustomResourceDefinitions(), + ).Run, ), controllerName), nil } diff --git a/pkg/apis/storagemigration/validation/validation.go b/pkg/apis/storagemigration/validation/validation.go index efcfb3618e7..fc8c3c78f0f 100644 --- a/pkg/apis/storagemigration/validation/validation.go +++ b/pkg/apis/storagemigration/validation/validation.go @@ -70,12 +70,14 @@ func ValidateStorageVersionMigrationStatusUpdate(newSVMBundle, oldSVMBundle *sto fldPath := field.NewPath("status") // resource version should be a non-negative integer - cmp, err := resourceversion.CompareResourceVersion(newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion) - if err != nil || cmp != 0 { - if err == nil { - err = fmt.Errorf("unable to compare resource versions, %s is not equal to %s", newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion) + if len(newSVMBundle.Status.ResourceVersion) != 0 { + cmp, err := resourceversion.CompareResourceVersion(newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion) + if err != nil || cmp != 0 { + if err == nil { + err = fmt.Errorf("unable to compare resource versions, %s is not equal to %s", newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion) + } + allErrs = append(allErrs, field.Invalid(fldPath.Child("resourceVersion"), newSVMBundle.Status.ResourceVersion, err.Error())) } - allErrs = append(allErrs, field.Invalid(fldPath.Child("resourceVersion"), newSVMBundle.Status.ResourceVersion, err.Error())) } allErrs = append(allErrs, metav1validation.ValidateConditions(newSVMBundle.Status.Conditions, fldPath.Child("conditions"))...) @@ -90,14 +92,6 @@ func ValidateStorageVersionMigrationStatusUpdate(newSVMBundle, oldSVMBundle *sto allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), newSVMBundle.Status.Conditions, "Both success and failed conditions cannot be true at the same time")) } - // running must be false when success is true or failed is true - if isSuccessful(newSVMBundle) && isRunning(newSVMBundle) { - allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), newSVMBundle.Status.Conditions, "Running condition cannot be true when success condition is true")) - } - if isFailed(newSVMBundle) && isRunning(newSVMBundle) { - allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), newSVMBundle.Status.Conditions, "Running condition cannot be true when failed condition is true")) - } - // success cannot be set to false once it is true isOldSuccessful := isSuccessful(oldSVMBundle) if isOldSuccessful && !isSuccessful(newSVMBundle) { @@ -126,11 +120,3 @@ func isFailed(svm *storagemigration.StorageVersionMigration) bool { } return false } - -func isRunning(svm *storagemigration.StorageVersionMigration) bool { - runningCondition := metaconditions.FindStatusCondition(svm.Status.Conditions, string(storagemigration.MigrationRunning)) - if runningCondition != nil && runningCondition.Status == metav1.ConditionTrue { - return true - } - return false -} diff --git a/pkg/apis/storagemigration/validation/validation_test.go b/pkg/apis/storagemigration/validation/validation_test.go index e6dabecca30..f21e4e804dc 100644 --- a/pkg/apis/storagemigration/validation/validation_test.go +++ b/pkg/apis/storagemigration/validation/validation_test.go @@ -295,18 +295,6 @@ func TestValidateStorageVersionMigrationStatusUpdate(t *testing.T) { oldSVM: newTestSVM("123", runningCond), errorSubstring: "Both success and failed conditions cannot be true at the same time", }, - { - name: "invalid: both succeeded and running are true", - newSVM: newTestSVM("123", succeededCond, runningCond), - oldSVM: newTestSVM("123", runningCond), - errorSubstring: "Running condition cannot be true when success condition is true", - }, - { - name: "invalid: both failed and running are true", - newSVM: newTestSVM("123", failedCond, runningCond), - oldSVM: newTestSVM("123", runningCond), - errorSubstring: "Running condition cannot be true when failed condition is true", - }, { name: "invalid: succeeded changed from true to false", newSVM: newTestSVM("123", succeededFalseCond), diff --git a/pkg/controller/storageversionmigrator/migrationrunner.go b/pkg/controller/storageversionmigrator/migrationrunner.go new file mode 100644 index 00000000000..a1d617e7741 --- /dev/null +++ b/pkg/controller/storageversionmigrator/migrationrunner.go @@ -0,0 +1,424 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "k8s.io/apiextensions-apiserver/pkg/apihelpers" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + applyconfigurationapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/applyconfiguration/apiextensions/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + svmv1beta1 "k8s.io/api/storagemigration/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + svminformers "k8s.io/client-go/informers/storagemigration/v1beta1" + clientset "k8s.io/client-go/kubernetes" + svmlisters "k8s.io/client-go/listers/storagemigration/v1beta1" +) + +const MigrationRunnerControllerName string = "migration-runner-controller" + +// MigrationRunnerController is responsible for managing the lifecycle of +// StorageVersionMigration resources. It will ensure that only a single StorageVersionMigration +// will be active at a time for a given GroupResource, by adding the MigrationRunning condition +// to the SVMs that are active. Only these SVMs will be processed by the downstream migration controllers. +// +// It adds the StorageMigrating condition to the CRD and is in charge of removing old +// stored versions from the CRD status. +type MigrationRunnerController struct { + svmListers svmlisters.StorageVersionMigrationLister + svmSynced cache.InformerSynced + queue workqueue.TypedRateLimitingInterface[metav1.GroupResource] + kubeClient clientset.Interface + crdClient apiextensionsclient.CustomResourceDefinitionInterface +} + +func NewCustomResourceController( + ctx context.Context, + kubeClient clientset.Interface, + svmInformer svminformers.StorageVersionMigrationInformer, + crdClient apiextensionsclient.CustomResourceDefinitionInterface, +) *MigrationRunnerController { + logger := klog.FromContext(ctx) + + crController := &MigrationRunnerController{ + kubeClient: kubeClient, + svmListers: svmInformer.Lister(), + svmSynced: svmInformer.Informer().HasSynced, + crdClient: crdClient, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[metav1.GroupResource](), + workqueue.TypedRateLimitingQueueConfig[metav1.GroupResource]{Name: ResourceVersionControllerName}, + ), + } + + _, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + crController.addSVM(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + crController.updateSVM(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + crController.deleteSVM(logger, obj) + }, + }) + + return crController +} + +func (crc *MigrationRunnerController) addSVM(logger klog.Logger, obj interface{}) { + svm := obj.(*svmv1beta1.StorageVersionMigration) + logger.V(4).Info("Adding", "svm", klog.KObj(svm)) + crc.enqueue(svm) +} + +func (crc *MigrationRunnerController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) { + oldSVM := oldObj.(*svmv1beta1.StorageVersionMigration) + newSVM := newObj.(*svmv1beta1.StorageVersionMigration) + logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM)) + crc.enqueue(newSVM) +} + +func (crc *MigrationRunnerController) deleteSVM(logger klog.Logger, obj interface{}) { + svm, ok := obj.(*svmv1beta1.StorageVersionMigration) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + logger.Info("could not cast obj to DeletedFinalStateUnknown", "object", obj) + return + } + svm, ok = tombstone.Obj.(*svmv1beta1.StorageVersionMigration) + if !ok { + logger.Info("could not cast tombstone to SVM", "object", obj) + return + } + } + logger.V(4).Info("Deleting", "svm", klog.KObj(svm)) + crc.enqueue(svm) +} + +func (crc *MigrationRunnerController) enqueue(svm *svmv1beta1.StorageVersionMigration) { + crc.queue.Add(svm.Spec.Resource) +} + +func (crc *MigrationRunnerController) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + + logger := klog.FromContext(ctx) + logger.Info("Starting", "controller", MigrationRunnerControllerName) + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down", "controller", MigrationRunnerControllerName) + crc.queue.ShutDown() + wg.Wait() + }() + + if !cache.WaitForNamedCacheSyncWithContext(ctx, crc.svmSynced) { + return + } + + wg.Go(func() { + wait.UntilWithContext(ctx, crc.worker, time.Second) + }) + <-ctx.Done() +} + +func (crc *MigrationRunnerController) worker(ctx context.Context) { + for crc.processNext(ctx) { + } +} + +func (crc *MigrationRunnerController) processNext(ctx context.Context) bool { + key, quit := crc.queue.Get() + if quit { + return false + } + defer crc.queue.Done(key) + + err := crc.sync(ctx, key) + if err == nil { + crc.queue.Forget(key) + return true + } + + utilruntime.HandleErrorWithContext(ctx, err, "Error syncing SVM resource, retrying", "svm", key) + crc.queue.AddRateLimited(key) + + return true +} + +func (crc *MigrationRunnerController) sync(ctx context.Context, resource metav1.GroupResource) error { + // Get all SVMs for this resource and trigger re-sync + svmsToCleanup, svmToPromote, err := crc.getSVMsForResource(resource) + if err != nil { + return err + } + + for _, svm := range svmsToCleanup { + if err := crc.cleanupAdmission(ctx, svm); err != nil { + return err + } + } + + if svmToPromote != nil { + if err := crc.markAsActive(ctx, svmToPromote); err != nil { + return err + } + } + + return nil +} + +func (crc *MigrationRunnerController) cleanupAdmission(ctx context.Context, svm *svmv1beta1.StorageVersionMigration) error { + needsUpdate := false + migratingCond := meta.FindStatusCondition(svm.Status.Conditions, string(svmv1beta1.MigrationRunning)) + if migratingCond != nil && migratingCond.Status == metav1.ConditionTrue { + newCond := &metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionFalse, + Reason: "MigrationCompleted", + Message: "Migration completed", + ObservedGeneration: svm.Generation, + } + + meta.SetStatusCondition(&svm.Status.Conditions, *newCond) + needsUpdate = true + } + + if needsUpdate { + if err := crc.cleanupCRD(ctx, svm); err != nil { + return err + } + + _, err := crc.kubeClient.StoragemigrationV1beta1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + svm, + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + } + + return nil +} + +func (crc *MigrationRunnerController) markAsActive(ctx context.Context, svm *svmv1beta1.StorageVersionMigration) error { + // Mark CRD as undergoing migration. + crd, exists, err := crc.crdForGroupResource(ctx, svm.Spec.Resource) + if err != nil { + return err + } + if exists { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + crd, err := crc.crdClient.Get(ctx, crd.Name, metav1.GetOptions{}) + if err != nil { + return err + } + applyConfig := applyconfigurationapiextensionsv1.CustomResourceDefinition(crd.Name). + // We add resource version to ensure there were no changes to the CRD + // between our get and apply, since if there was that may affect the + // generation we set in the condition. + WithResourceVersion(crd.ResourceVersion). + WithStatus(applyconfigurationapiextensionsv1.CustomResourceDefinitionStatus(). + WithConditions(applyconfigurationapiextensionsv1.CustomResourceDefinitionCondition(). + WithType(apiextensionsv1.StorageMigrating). + WithStatus(apiextensionsv1.ConditionTrue). + WithLastTransitionTime(metav1.Now()). + WithReason("MigrationRunning"). + WithMessage(fmt.Sprintf("Migration %s is running", svm.Name)). + WithObservedGeneration(crd.Generation))) + + // TODO: We should see if there's a way to surface non-conflict/not-found errors + // to the SVM Status to let the user know that the migration is stuck due to issues + // outside of the controller's control. + _, err = crc.crdClient.ApplyStatus(ctx, applyConfig, metav1.ApplyOptions{FieldManager: MigrationRunnerControllerName, Force: true}) + return err + }) + if err != nil { + return err + } + } + + svm = setStatusConditions(svm, svmv1beta1.MigrationRunning, "MigrationRunning", "The migration is running") + _, err = crc.kubeClient.StoragemigrationV1beta1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + svm, + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + + return nil +} + +func compareSVM(a, b *svmv1beta1.StorageVersionMigration) int { + if i := a.CreationTimestamp.Compare(b.CreationTimestamp.Time); i != 0 { + return i + } + return strings.Compare(a.Name, b.Name) +} + +func (crc *MigrationRunnerController) getSVMsForResource(resource metav1.GroupResource) (svmsToCleanup []*svmv1beta1.StorageVersionMigration, svmToPromote *svmv1beta1.StorageVersionMigration, err error) { + svms, err := crc.svmListers.List(labels.Everything()) + if err != nil { + return nil, nil, err + } + svmInProgress := false + for _, svm := range svms { + if svm.Spec.Resource != resource { + continue + } + svmRunning := isRunning(svm) + svmTerminal := isTerminal(svm) + promotable := !svmRunning && !svmTerminal && !svmInProgress + switch { + case svmRunning && svmTerminal: + svmsToCleanup = append(svmsToCleanup, svm.DeepCopy()) + + case svmRunning: + svmInProgress = true + svmToPromote = nil + + case promotable && (svmToPromote == nil || compareSVM(svm, svmToPromote) == -1): + svmToPromote = svm + } + } + + if svmToPromote != nil { + svmToPromote = svmToPromote.DeepCopy() + } + return svmsToCleanup, svmToPromote, nil +} + +func isTerminal(svm *svmv1beta1.StorageVersionMigration) bool { + return meta.IsStatusConditionTrue(svm.Status.Conditions, string(svmv1beta1.MigrationSucceeded)) || + meta.IsStatusConditionTrue(svm.Status.Conditions, string(svmv1beta1.MigrationFailed)) +} + +func isRunning(svm *svmv1beta1.StorageVersionMigration) bool { + return meta.IsStatusConditionTrue(svm.Status.Conditions, string(svmv1beta1.MigrationRunning)) +} + +func (crc *MigrationRunnerController) cleanupCRD(ctx context.Context, toBeProcessedSVM *svmv1beta1.StorageVersionMigration) error { + migratingSuccessfulCond := meta.FindStatusCondition(toBeProcessedSVM.Status.Conditions, string(svmv1beta1.MigrationSucceeded)) + success := migratingSuccessfulCond != nil && migratingSuccessfulCond.Status == metav1.ConditionTrue + crd, exists, err := crc.crdForGroupResource(ctx, toBeProcessedSVM.Spec.Resource) + if err != nil { + return err + } + if !exists { + return nil + } + if err := crc.finishCRDMigration(ctx, crd, success); err != nil { + return err + } + + return nil +} + +func (crc *MigrationRunnerController) finishCRDMigration(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, success bool) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + crd, err := crc.crdClient.Get(ctx, crd.Name, metav1.GetOptions{}) + if err != nil { + return err + } + migratingCond := apihelpers.FindCRDCondition(crd, apiextensionsv1.StorageMigrating) + var canUpdateStoredVersions bool + if migratingCond != nil && migratingCond.ObservedGeneration == crd.Generation { + canUpdateStoredVersions = true + } + + var reason, msg string + if success { + reason = "MigrationSucceeded" + msg = "The migration has succeeded" + if canUpdateStoredVersions { + msg += " and the stored versions have been updated" + } else { + msg += " but the stored versions have not been updated due to a generation mismatch" + } + } else { + reason = "MigrationFailed" + msg = "The migration has failed" + } + + applyConfig := applyconfigurationapiextensionsv1.CustomResourceDefinition(crd.Name). + // We add resource version to ensure there were no changes to the CRD + // between our get and apply, since if there was that may affect the validity + // of the stored versions we are about to set. + WithResourceVersion(crd.ResourceVersion). + WithStatus(applyconfigurationapiextensionsv1.CustomResourceDefinitionStatus(). + WithConditions(applyconfigurationapiextensionsv1.CustomResourceDefinitionCondition(). + WithType(apiextensionsv1.StorageMigrating). + WithStatus(apiextensionsv1.ConditionFalse). + WithLastTransitionTime(metav1.Now()). + WithReason(reason). + WithMessage(msg). + WithObservedGeneration(crd.Generation))) + + if canUpdateStoredVersions { + // Wipe out all stored versions that we have migrated off of if nothing else + // has transpired while the migration was in progress. + for _, version := range crd.Spec.Versions { + if version.Storage { + applyConfig.Status.WithStoredVersions(version.Name) + break + } + } + } + + // TODO: We should see if there's a way to surface non-conflict/not-found errors + // to the SVM Status to let the user know that the migration is stuck due to issues + // outside of the controller's control. + _, err = crc.crdClient.ApplyStatus(ctx, applyConfig, metav1.ApplyOptions{FieldManager: MigrationRunnerControllerName, Force: true}) + return err + }) +} + +func (crc *MigrationRunnerController) crdForGroupResource(ctx context.Context, gr metav1.GroupResource) (*apiextensionsv1.CustomResourceDefinition, bool, error) { + crdName := fmt.Sprintf("%s.%s", gr.Resource, gr.Group) + crd, err := crc.crdClient.Get(ctx, crdName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + return crd, true, nil +} diff --git a/pkg/controller/storageversionmigrator/migrationrunner_test.go b/pkg/controller/storageversionmigrator/migrationrunner_test.go new file mode 100644 index 00000000000..9320d665c1c --- /dev/null +++ b/pkg/controller/storageversionmigrator/migrationrunner_test.go @@ -0,0 +1,471 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" + svmv1beta1 "k8s.io/api/storagemigration/v1beta1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" + apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + svminformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" +) + +func init() { + _ = apiextensionsv1.AddToScheme(apiextensionsscheme.Scheme) +} + +func TestCustomResourceController_Sync(t *testing.T) { + newSVM := func(name, group, resource string, conditions ...metav1.Condition) *svmv1beta1.StorageVersionMigration { + return &svmv1beta1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: svmv1beta1.StorageVersionMigrationSpec{ + Resource: metav1.GroupResource{Group: group, Resource: resource}, + }, + Status: svmv1beta1.StorageVersionMigrationStatus{ + Conditions: conditions, + }, + } + } + + newCRD := func(name string, generation int64, storageVersion string) *apiextensionsv1.CustomResourceDefinition { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Generation: generation, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1", Storage: storageVersion == "v1"}, + {Name: "v2", Storage: storageVersion == "v2"}, + }, + }, + } + } + + newCRDPatchAction := func(name string, conditions []apiextensionsv1.CustomResourceDefinitionCondition, storedVersions []string) k8stesting.PatchAction { + statusMap := map[string]interface{}{ + "conditions": conditions, + } + if storedVersions != nil { + statusMap["storedVersions"] = storedVersions + } + patchObj := map[string]interface{}{ + "status": statusMap, + } + patch, err := json.Marshal(patchObj) + if err != nil { + panic(err) + } + return k8stesting.NewPatchAction( + apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), + "", + name, + types.MergePatchType, + patch, + ) + } + + testCases := []struct { + name string + svms []*svmv1beta1.StorageVersionMigration + crd *apiextensionsv1.CustomResourceDefinition + expectErr bool + expectKubeActions []k8stesting.Action + expectCRDActions []k8stesting.Action + }{ + { + name: "Initial sync: CRD missing", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets")}, + expectErr: false, + expectKubeActions: []k8stesting.Action{ + k8stesting.NewUpdateAction( + svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), + "", + newSVM("test-svm", "example.com", "widgets", metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + Reason: "MigrationRunning", + Message: "The migration is running", + }), + ), + }, + }, + { + name: "Initial sync: CRD exists", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets")}, + crd: newCRD("widgets.example.com", 1, "v2"), + expectErr: false, + expectKubeActions: []k8stesting.Action{ + k8stesting.NewUpdateAction( + svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), + "", + newSVM("test-svm", "example.com", "widgets", metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + Reason: "MigrationRunning", + Message: "The migration is running", + }), + ), + }, + expectCRDActions: []k8stesting.Action{ + newCRDPatchAction( + "widgets.example.com", + []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.StorageMigrating, + Status: apiextensionsv1.ConditionTrue, + Reason: "MigrationRunning", + Message: "Migration test-svm is running", + }, + }, + nil, + ), + }, + }, + { + name: "Migration running (No-op)", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets", metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + })}, + crd: newCRD("widgets.example.com", 1, "v2"), + expectErr: false, + }, + { + name: "Migration succeeded: Update CRD stored versions", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets", + metav1.Condition{ + Type: string(svmv1beta1.MigrationSucceeded), + Status: metav1.ConditionTrue, + }, + metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + )}, + crd: func() *apiextensionsv1.CustomResourceDefinition { + c := newCRD("widgets.example.com", 1, "v2") + c.Status.Conditions = []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.StorageMigrating, + Status: apiextensionsv1.ConditionTrue, + ObservedGeneration: 1, + Reason: "MigrationRunning", + }, + } + return c + }(), + expectErr: false, + expectCRDActions: []k8stesting.Action{ + newCRDPatchAction( + "widgets.example.com", + []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.StorageMigrating, + Status: apiextensionsv1.ConditionFalse, + Reason: "MigrationSucceeded", + Message: "The migration has succeeded and the stored versions have been updated", + }, + }, + []string{"v2"}, + ), + }, + }, + { + name: "Migration succeeded: CRD generation mismatch (No update)", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets", + metav1.Condition{ + Type: string(svmv1beta1.MigrationSucceeded), + Status: metav1.ConditionTrue, + }, + metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + )}, + crd: newCRD("widgets.example.com", 2, "v2"), + expectErr: false, + expectCRDActions: []k8stesting.Action{ + newCRDPatchAction( + "widgets.example.com", + []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.StorageMigrating, + Status: apiextensionsv1.ConditionFalse, + Reason: "MigrationSucceeded", + Message: "The migration has succeeded but the stored versions have not been updated due to a generation mismatch", + }, + }, + nil, + ), + }, + }, + { + name: "Migration failed: CRD condition updated", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets", + metav1.Condition{ + Type: string(svmv1beta1.MigrationFailed), + Status: metav1.ConditionTrue, + }, + metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + )}, + crd: func() *apiextensionsv1.CustomResourceDefinition { + c := newCRD("widgets.example.com", 1, "v2") + c.Status.Conditions = []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.StorageMigrating, + Status: apiextensionsv1.ConditionTrue, + ObservedGeneration: 1, + Reason: "MigrationRunning", + }, + } + return c + }(), + expectErr: false, + expectCRDActions: []k8stesting.Action{ + newCRDPatchAction( + "widgets.example.com", + []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.StorageMigrating, + Status: apiextensionsv1.ConditionFalse, + Reason: "MigrationFailed", + Message: "The migration has failed", + }, + }, + []string{"v2"}, + ), + }, + }, + { + name: "Cleanup admission updates SVM condition", + svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets", + metav1.Condition{ + Type: string(svmv1beta1.MigrationSucceeded), + Status: metav1.ConditionTrue, + }, + metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + Reason: "MigrationRunning", + ObservedGeneration: 1, + }, + )}, + expectErr: false, + expectKubeActions: []k8stesting.Action{ + k8stesting.NewUpdateSubresourceAction( + svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), + "status", + "", + newSVM("test-svm", "example.com", "widgets", + metav1.Condition{ + Type: string(svmv1beta1.MigrationSucceeded), + Status: metav1.ConditionTrue, + }, + metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionFalse, + Reason: "MigrationCompleted", + }, + ), + ), + }, + }, + { + name: "Promote older SVM first", + svms: []*svmv1beta1.StorageVersionMigration{ + func() *svmv1beta1.StorageVersionMigration { + svm := newSVM("test-svm-newer", "example.com", "widgets") + svm.CreationTimestamp = metav1.Time{Time: time.Now().Add(1 * time.Hour)} + return svm + }(), + func() *svmv1beta1.StorageVersionMigration { + svm := newSVM("test-svm-older", "example.com", "widgets") + svm.CreationTimestamp = metav1.Time{Time: time.Now().Add(-1 * time.Hour)} + return svm + }(), + }, + expectErr: false, + expectKubeActions: []k8stesting.Action{ + k8stesting.NewUpdateAction( + svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), + "", + newSVM("test-svm-older", "example.com", "widgets", metav1.Condition{ + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + Reason: "MigrationRunning", + Message: "The migration is running", + }), + ), + }, + }, + } + + filterActions := func(actions []k8stesting.Action) []k8stesting.Action { + var ret []k8stesting.Action + for _, action := range actions { + if action.GetVerb() == "list" || action.GetVerb() == "watch" || action.GetVerb() == "get" { + continue + } + ret = append(ret, action) + } + return ret + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + var initialSVMs []runtime.Object + for _, svm := range tc.svms { + initialSVMs = append(initialSVMs, svm) + } + kubeClient := kubefake.NewClientset(initialSVMs...) + + kubeInformerFactory := svminformers.NewSharedInformerFactory(kubeClient, 0) + svmInformer := kubeInformerFactory.Storagemigration().V1beta1().StorageVersionMigrations() + for _, svm := range tc.svms { + err := svmInformer.Informer().GetStore().Add(svm) + require.NoError(t, err) + } + + var initialCRDs []runtime.Object + if tc.crd != nil { + initialCRDs = append(initialCRDs, tc.crd) + } + crdClientSet := apiextensionsfake.NewClientset(initialCRDs...) + + crdScheme := runtime.NewScheme() + _ = apiextensionsv1.AddToScheme(crdScheme) + crdTracker := k8stesting.NewObjectTracker(crdScheme, serializer.NewCodecFactory(crdScheme).UniversalDecoder()) + + for _, obj := range initialCRDs { + _ = crdTracker.Add(obj) + } + + crdClientSet.PrependReactor("*", "*", k8stesting.ObjectReaction(crdTracker)) + + controller := NewCustomResourceController( + ctx, + kubeClient, + svmInformer, + crdClientSet.ApiextensionsV1().CustomResourceDefinitions(), + ) + + resource := metav1.GroupResource{Group: "example.com", Resource: "widgets"} + if len(tc.svms) > 0 { + resource = tc.svms[0].Spec.Resource + } + + err := controller.sync(ctx, resource) + + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if tc.expectKubeActions != nil { + kubeActions := filterActions(kubeClient.Actions()) + require.Len(t, kubeActions, len(tc.expectKubeActions), "mismatched number of kube client actions") + + for i, expected := range tc.expectKubeActions { + actual := kubeActions[i] + require.Equal(t, expected.GetVerb(), actual.GetVerb(), "kube action %d: verb mismatch", i) + require.Equal(t, expected.GetResource(), actual.GetResource(), "kube action %d: resource mismatch", i) + + if updateAction, ok := actual.(k8stesting.UpdateAction); ok { + actualObj := updateAction.GetObject().(*svmv1beta1.StorageVersionMigration) + expectedObj := expected.(k8stesting.UpdateAction).GetObject().(*svmv1beta1.StorageVersionMigration) + + require.Equal(t, expectedObj.Name, actualObj.Name, "object name mismatch") + require.Len(t, actualObj.Status.Conditions, len(expectedObj.Status.Conditions), "condition count mismatch") + for j, expCond := range expectedObj.Status.Conditions { + actCond := actualObj.Status.Conditions[j] + require.Equal(t, expCond.Type, actCond.Type, "condition type mismatch") + require.Equal(t, expCond.Status, actCond.Status, "condition status mismatch") + if expCond.Reason != "" { + require.Equal(t, expCond.Reason, actCond.Reason, "condition reason mismatch") + } + } + } + } + } + + // Verify CRD status updates + if tc.expectCRDActions != nil { + crdActions := filterActions(crdClientSet.Actions()) + require.Len(t, crdActions, len(tc.expectCRDActions), "mismatched number of crd client actions") + + for i, expected := range tc.expectCRDActions { + actual := crdActions[i] + require.Equal(t, expected.GetVerb(), actual.GetVerb(), "crd action %d: verb mismatch", i) + + if patchAction, ok := actual.(k8stesting.PatchAction); ok { + actualPatch := patchAction.GetPatch() + expectedPatch := expected.(k8stesting.PatchAction).GetPatch() + + var actualMap, expectedMap map[string]interface{} + require.NoError(t, json.Unmarshal(actualPatch, &actualMap)) + require.NoError(t, json.Unmarshal(expectedPatch, &expectedMap)) + + // Helper to verify conditions ignoring LastTransitionTime + actualStatus := actualMap["status"].(map[string]interface{}) + expectedStatus := expectedMap["status"].(map[string]interface{}) + + if expectedConds, ok := expectedStatus["conditions"].([]interface{}); ok { + actualConds := actualStatus["conditions"].([]interface{}) + require.Len(t, actualConds, len(expectedConds)) + for k, expC := range expectedConds { + expCondMap := expC.(map[string]interface{}) + actCondMap := actualConds[k].(map[string]interface{}) + require.Equal(t, expCondMap["type"], actCondMap["type"]) + require.Equal(t, expCondMap["status"], actCondMap["status"]) + require.Equal(t, expCondMap["reason"], actCondMap["reason"]) + require.Equal(t, expCondMap["message"], actCondMap["message"]) + } + // Remove conditions for deep equal of the rest + delete(actualStatus, "conditions") + delete(expectedStatus, "conditions") + } + + require.Equal(t, expectedStatus, actualStatus) + } + } + } + }) + } +} diff --git a/pkg/controller/storageversionmigrator/resourceversion.go b/pkg/controller/storageversionmigrator/resourceversion.go index 2c708288933..fc6bc345bfa 100644 --- a/pkg/controller/storageversionmigrator/resourceversion.go +++ b/pkg/controller/storageversionmigrator/resourceversion.go @@ -189,6 +189,11 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error } // working with copy to avoid race condition between this and migration controller toBeProcessedSVM := svm.DeepCopy() + + if !meta.IsStatusConditionTrue(toBeProcessedSVM.Status.Conditions, string(svmv1beta1.MigrationRunning)) { + logger.V(4).Info("Migration is not running yet, skipping", "svm", name) + return nil + } gr := toBeProcessedSVM.Spec.Resource if meta.IsStatusConditionTrue(toBeProcessedSVM.Status.Conditions, string(svmv1beta1.MigrationSucceeded)) || diff --git a/pkg/controller/storageversionmigrator/resourceversion_test.go b/pkg/controller/storageversionmigrator/resourceversion_test.go index 60e4bd40cc1..b25737b0c78 100644 --- a/pkg/controller/storageversionmigrator/resourceversion_test.go +++ b/pkg/controller/storageversionmigrator/resourceversion_test.go @@ -206,10 +206,20 @@ func TestRVSync(t *testing.T) { expectErr string expectKubeActions []kubetesting.Action }{ + { + name: "Migration not active", + key: "inactive-svm", + svm: newSVMWithConditions("inactive-svm", "", []metav1.Condition{}), + }, { name: "Successful RV acquisition", key: "test-svm", - svm: newSVM("test-svm", ""), + svm: newSVMWithConditions("test-svm", "", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + }), discoveryResources: &metav1.APIResourceList{ GroupVersion: "apps/v1", APIResources: []metav1.APIResource{ @@ -225,7 +235,12 @@ func TestRVSync(t *testing.T) { kubetesting.NewUpdateAction( svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", - newSVM("test-svm", "12345"), + newSVMWithConditions("test-svm", "12345", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + }), ), }, }, @@ -234,6 +249,11 @@ func TestRVSync(t *testing.T) { key: "non-existent-svm", svm: nil, }, + { + name: "SVM has no CRD condition", + key: "succeeded-svm", + svm: newSVM("succeeded-svm", ""), + }, { name: "SVM already succeeded", key: "succeeded-svm", @@ -257,12 +277,22 @@ func TestRVSync(t *testing.T) { { name: "RV already set", key: "rv-set-svm", - svm: newSVM("rv-set-svm", "123"), + svm: newSVMWithConditions("rv-set-svm", "123", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + }), }, { name: "Resource not migratable", key: "not-migratable-svm", - svm: newSVM("not-migratable-svm", ""), + svm: newSVMWithConditions("not-migratable-svm", "", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + }), discoveryResources: &metav1.APIResourceList{ GroupVersion: "apps/v1", APIResources: []metav1.APIResource{ @@ -274,6 +304,10 @@ func TestRVSync(t *testing.T) { svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", newSVMWithConditions("not-migratable-svm", "", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, { Type: string(svmv1beta1.MigrationFailed), Status: metav1.ConditionTrue, @@ -285,7 +319,12 @@ func TestRVSync(t *testing.T) { { name: "Metadata list error", key: "metadata-error-svm", - svm: newSVM("metadata-error-svm", ""), + svm: newSVMWithConditions("metadata-error-svm", "", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + }), discoveryResources: &metav1.APIResourceList{ GroupVersion: "apps/v1", APIResources: []metav1.APIResource{ @@ -299,7 +338,12 @@ func TestRVSync(t *testing.T) { { name: "Invalid RV returned", key: "invalid-rv-svm", - svm: newSVM("invalid-rv-svm", ""), + svm: newSVMWithConditions("invalid-rv-svm", "", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, + }), discoveryResources: &metav1.APIResourceList{ GroupVersion: "apps/v1", APIResources: []metav1.APIResource{ @@ -316,6 +360,10 @@ func TestRVSync(t *testing.T) { svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", newSVMWithConditions("invalid-rv-svm", "", []metav1.Condition{ + { + Type: string(svmv1beta1.MigrationRunning), + Status: metav1.ConditionTrue, + }, { Type: string(svmv1beta1.MigrationFailed), Status: metav1.ConditionTrue, diff --git a/pkg/controller/storageversionmigrator/storageversionmigrator.go b/pkg/controller/storageversionmigrator/storageversionmigrator.go index 73c5a8bb066..7efdb4c9d51 100644 --- a/pkg/controller/storageversionmigrator/storageversionmigrator.go +++ b/pkg/controller/storageversionmigrator/storageversionmigrator.go @@ -262,17 +262,6 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { return fmt.Errorf("GC cache is not up to date, requeuing to attempt again. gcListResourceVersion: %s, listResourceVersion: %s", gcListResourceVersion, listResourceVersion) } - toBeProcessedSVM, err = svmc.kubeClient.StoragemigrationV1beta1(). - StorageVersionMigrations(). - UpdateStatus( - ctx, - setStatusConditions(toBeProcessedSVM, svmv1beta1.MigrationRunning, migrationRunningStatusReason, ""), - metav1.UpdateOptions{}, - ) - if err != nil { - return err - } - err, failedMigration := svmc.runMigration(ctx, *gvr, resourceMonitor, toBeProcessedSVM, listResourceVersion) if err != nil { return err diff --git a/pkg/controller/storageversionmigrator/storageversionmigrator_test.go b/pkg/controller/storageversionmigrator/storageversionmigrator_test.go index 01abee13248..9393e31739d 100644 --- a/pkg/controller/storageversionmigrator/storageversionmigrator_test.go +++ b/pkg/controller/storageversionmigrator/storageversionmigrator_test.go @@ -175,22 +175,10 @@ func TestSync(t *testing.T) { }, expectErr: false, expectKubeActions: []k8stesting.Action{ - k8stesting.NewUpdateAction( - svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), - "", - newSVMWithConditions("test-svm", "100", []metav1.Condition{{ - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionTrue, - }}), - ), k8stesting.NewUpdateAction( svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionFalse, - }, { Type: string(svmv1beta1.MigrationSucceeded), Status: metav1.ConditionTrue, @@ -268,22 +256,10 @@ func TestSync(t *testing.T) { }, expectErr: false, expectKubeActions: []k8stesting.Action{ - k8stesting.NewUpdateAction( - svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), - "", - newSVMWithConditions("test-svm", "100", []metav1.Condition{{ - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionTrue, - }}), - ), k8stesting.NewUpdateAction( svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionFalse, - }, { Type: string(svmv1beta1.MigrationFailed), Status: metav1.ConditionTrue, @@ -311,20 +287,6 @@ func TestSync(t *testing.T) { svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionTrue, - }, - }), - ), - k8stesting.NewUpdateAction( - svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), - "", - newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionFalse, - }, { Type: string(svmv1beta1.MigrationSucceeded), Status: metav1.ConditionTrue, @@ -359,18 +321,6 @@ func TestSync(t *testing.T) { "ns1/res1": apierrors.NewTooManyRequests("simulating throttling", 1), }, expectErr: true, - expectKubeActions: []k8stesting.Action{ - k8stesting.NewUpdateAction( - svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), - "", - newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionTrue, - }, - }), - ), - }, expectDynamicActions: []k8stesting.Action{ k8stesting.NewPatchAction(testGVR, "ns1", "res1", types.ApplyPatchType, mustMarshal(t, typeMetaUIDRV{ TypeMeta: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "Deployment"}, @@ -422,20 +372,6 @@ func TestSync(t *testing.T) { svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), "", newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionTrue, - }, - }), - ), - k8stesting.NewUpdateAction( - svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"), - "", - newSVMWithConditions("test-svm", "100", []metav1.Condition{ - { - Type: string(svmv1beta1.MigrationRunning), - Status: metav1.ConditionFalse, - }, { Type: string(svmv1beta1.MigrationFailed), Status: metav1.ConditionTrue, @@ -484,7 +420,7 @@ func TestSync(t *testing.T) { if tc.expectKubeActions != nil { kubeActions := filterActions(kubeClient.Actions()) - require.Len(t, kubeActions, len(tc.expectKubeActions), "mismatched number of kube client actions") + require.Len(t, kubeActions, len(tc.expectKubeActions), "mismatched number of kube client actions, expected %d, got %d", len(tc.expectKubeActions), len(kubeActions)) for i, expected := range tc.expectKubeActions { actual := kubeActions[i] diff --git a/pkg/controller/storageversionmigrator/util.go b/pkg/controller/storageversionmigrator/util.go index 78f64e605e0..0c7d7b2b253 100644 --- a/pkg/controller/storageversionmigrator/util.go +++ b/pkg/controller/storageversionmigrator/util.go @@ -34,14 +34,6 @@ func setStatusConditions( return toBeUpdatedSVM } - if conditionType == svmv1beta1.MigrationSucceeded || conditionType == svmv1beta1.MigrationFailed { - // set running condition to false if we're finished - runningCond := meta.FindStatusCondition(toBeUpdatedSVM.Status.Conditions, string(svmv1beta1.MigrationRunning)) - if runningCond != nil { - runningCond.Status = metav1.ConditionFalse - } - } - toBeUpdatedSVM.Status.Conditions = append(toBeUpdatedSVM.Status.Conditions, metav1.Condition{ Type: string(conditionType), Status: metav1.ConditionTrue, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index ff1af01ec5e..5e6a85e8be8 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1967,6 +1967,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate apiextensionsfeatures.CRDObservedGenerationTracking: { {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Beta}, + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, }, apiextensionsfeatures.CRDValidationRatcheting: { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 7da3758d885..3a1fd11c407 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -535,6 +535,8 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) // need patch for SSA of any resource // need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(), + rbacv1helpers.NewRule("get").Groups("apiextensions.k8s.io").Resources("customresourcedefinitions").RuleOrDie(), + rbacv1helpers.NewRule("patch").Groups("apiextensions.k8s.io").Resources("customresourcedefinitions/status").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(), }, }) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1/types.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1/types.go index 764726b6e9a..b3a8ce7ef46 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1/types.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1/types.go @@ -332,6 +332,9 @@ const ( // be true if .metadata.annotations["api-approved.kubernetes.io"] is set to a URL, otherwise it will be false. // See https://github.com/kubernetes/enhancements/pull/1111 for more details. KubernetesAPIApprovalPolicyConformant CustomResourceDefinitionConditionType = "KubernetesAPIApprovalPolicyConformant" + // StorageMigrating indicates that the underlying storage version of the CRD + // is undergoing migration. + StorageMigrating CustomResourceDefinitionConditionType = "StorageMigrating" ) // CustomResourceDefinitionCondition contains details for the current condition of this pod. diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go index e59d6213f61..13ede31c1bd 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go @@ -65,6 +65,7 @@ func init() { var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate.VersionedSpecs{ CRDObservedGenerationTracking: { {Version: version.MustParse("1.35"), PreRelease: featuregate.Beta, Default: false}, + {Version: version.MustParse("1.36"), PreRelease: featuregate.Beta, Default: true}, }, CRDValidationRatcheting: { {Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Alpha}, diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 0a3ba234c72..f07ca28fc7a 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -26,7 +26,7 @@ | CPUManagerPolicyAlphaOptions | | | 1.23– | | | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyAlphaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyAlphaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | CPUManagerPolicyBetaOptions | :ballot_box_with_check: 1.23+ | | | 1.23– | | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyBetaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyBetaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | CPUManagerPolicyOptions | :ballot_box_with_check: 1.23+ | :closed_lock_with_key: 1.33+ | 1.22 | 1.23–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | -| CRDObservedGenerationTracking | | | | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| CRDObservedGenerationTracking | :ballot_box_with_check: 1.36+ | | | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | CRDValidationRatcheting | :ballot_box_with_check: 1.30+ | :closed_lock_with_key: 1.33+ | 1.28–1.29 | 1.30–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbCRDValidationRatcheting%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDValidationRatcheting%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | CSIServiceAccountTokenSecrets | :ballot_box_with_check: 1.35+ | :closed_lock_with_key: 1.36+ | | 1.35 | 1.36– | | | [code](https://cs.k8s.io/?q=%5CbCSIServiceAccountTokenSecrets%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCSIServiceAccountTokenSecrets%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | CSIVolumeHealth | | | 1.21– | | | | | [code](https://cs.k8s.io/?q=%5CbCSIVolumeHealth%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCSIVolumeHealth%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index b586d4f38e1..3d3a8f543a2 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -357,6 +357,10 @@ lockToDefault: false preRelease: Beta version: "1.35" + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: CRDValidationRatcheting versionedSpecs: - default: false diff --git a/test/integration/storageversionmigrator/storageversionmigrator_test.go b/test/integration/storageversionmigrator/storageversionmigrator_test.go index de91f4e1a22..4ecc884c4ac 100644 --- a/test/integration/storageversionmigrator/storageversionmigrator_test.go +++ b/test/integration/storageversionmigrator/storageversionmigrator_test.go @@ -26,6 +26,7 @@ import ( "go.uber.org/goleak" + extensionfeatures "k8s.io/apiextensions-apiserver/pkg/features" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" @@ -154,6 +155,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ features.StorageVersionMigrator: true, featuregate.Feature(clientgofeaturegate.InformerResourceVersion): true, + extensionfeatures.CRDObservedGenerationTracking: true, }) // decode errors are expected when using conversation webhooks etcd3watcher.TestOnlySetFatalOnDecodeError(false) @@ -262,7 +264,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { if err != nil { t.Fatalf("Failed to create SVM resource: %v", err) } - if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, "triggercr"); !ok { + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, crd.Name, "triggercr"); !ok { t.Fatalf("CRD not migrated") } @@ -303,6 +305,7 @@ func TestStorageVersionMigrationDuringChaos(t *testing.T) { featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ features.StorageVersionMigrator: true, featuregate.Feature(clientgofeaturegate.InformerResourceVersion): true, + extensionfeatures.CRDObservedGenerationTracking: true, }) ctx := ktesting.Init(t) @@ -343,7 +346,7 @@ func TestStorageVersionMigrationDuringChaos(t *testing.T) { return } triggerCRName := "chaos-trigger-" + strconv.Itoa(i) - if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, triggerCRName); !ok { + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, crd.Name, triggerCRName); !ok { t.Errorf("CRD not migrated") return } diff --git a/test/integration/storageversionmigrator/util.go b/test/integration/storageversionmigrator/util.go index aaf79cce637..e2c64f593a5 100644 --- a/test/integration/storageversionmigrator/util.go +++ b/test/integration/storageversionmigrator/util.go @@ -1054,6 +1054,24 @@ func (svm *svmTest) setupServerCert(t *testing.T) *certContext { } } +func (svm *svmTest) crdMigrated(t *testing.T, crdName string) bool { + t.Helper() + + crd, err := svm.apiextensionsclient.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), crdName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + + var storedVersion string + for _, version := range crd.Spec.Versions { + if version.Storage { + storedVersion = version.Name + } + } + + return len(crd.Status.StoredVersions) == 1 && crd.Status.StoredVersions[0] == storedVersion +} + func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bool { t.Helper() @@ -1072,7 +1090,7 @@ func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bo return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version) } -func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, triggerCRName string) bool { +func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, crdName, triggerCRName string) bool { t.Helper() var triggerOnce sync.Once @@ -1099,8 +1117,8 @@ func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, return false, nil } - if metaconditions.IsStatusConditionTrue(svmConditions, string(svmv1beta1.MigrationSucceeded)) { - t.Logf("%q SVM has completed migration", crdSVMName) + if metaconditions.IsStatusConditionTrue(svmConditions, string(svmv1beta1.MigrationSucceeded)) && svm.crdMigrated(t, crdName) { + t.Logf("%q SVM has completed migration for crd %s", crdSVMName, crdName) return true, nil }