mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Merge pull request #135297 from michaelasp/svmUpdateCRD
Remove CRD stored versions from status upon SVM migration
This commit is contained in:
commit
38940f0222
18 changed files with 1012 additions and 129 deletions
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
|
|
@ -69,6 +70,11 @@ func newSVMController(ctx context.Context, controllerContext ControllerContext,
|
|||
|
||||
informer := controllerContext.InformerFactory.Storagemigration().V1beta1().StorageVersionMigrations()
|
||||
|
||||
crdClientset, err := apiextensionsclientset.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dynamicClient, err := dynamic.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -102,5 +108,11 @@ func newSVMController(ctx context.Context, controllerContext ControllerContext,
|
|||
informer,
|
||||
controllerContext.RESTMapper,
|
||||
).Run,
|
||||
svm.NewCustomResourceController(
|
||||
ctx,
|
||||
client,
|
||||
informer,
|
||||
crdClientset.ApiextensionsV1().CustomResourceDefinitions(),
|
||||
).Run,
|
||||
), controllerName), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,12 +70,14 @@ func ValidateStorageVersionMigrationStatusUpdate(newSVMBundle, oldSVMBundle *sto
|
|||
fldPath := field.NewPath("status")
|
||||
|
||||
// resource version should be a non-negative integer
|
||||
cmp, err := resourceversion.CompareResourceVersion(newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion)
|
||||
if err != nil || cmp != 0 {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("unable to compare resource versions, %s is not equal to %s", newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion)
|
||||
if len(newSVMBundle.Status.ResourceVersion) != 0 {
|
||||
cmp, err := resourceversion.CompareResourceVersion(newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion)
|
||||
if err != nil || cmp != 0 {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("unable to compare resource versions, %s is not equal to %s", newSVMBundle.Status.ResourceVersion, newSVMBundle.Status.ResourceVersion)
|
||||
}
|
||||
allErrs = append(allErrs, field.Invalid(fldPath.Child("resourceVersion"), newSVMBundle.Status.ResourceVersion, err.Error()))
|
||||
}
|
||||
allErrs = append(allErrs, field.Invalid(fldPath.Child("resourceVersion"), newSVMBundle.Status.ResourceVersion, err.Error()))
|
||||
}
|
||||
|
||||
allErrs = append(allErrs, metav1validation.ValidateConditions(newSVMBundle.Status.Conditions, fldPath.Child("conditions"))...)
|
||||
|
|
@ -90,14 +92,6 @@ func ValidateStorageVersionMigrationStatusUpdate(newSVMBundle, oldSVMBundle *sto
|
|||
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), newSVMBundle.Status.Conditions, "Both success and failed conditions cannot be true at the same time"))
|
||||
}
|
||||
|
||||
// running must be false when success is true or failed is true
|
||||
if isSuccessful(newSVMBundle) && isRunning(newSVMBundle) {
|
||||
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), newSVMBundle.Status.Conditions, "Running condition cannot be true when success condition is true"))
|
||||
}
|
||||
if isFailed(newSVMBundle) && isRunning(newSVMBundle) {
|
||||
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), newSVMBundle.Status.Conditions, "Running condition cannot be true when failed condition is true"))
|
||||
}
|
||||
|
||||
// success cannot be set to false once it is true
|
||||
isOldSuccessful := isSuccessful(oldSVMBundle)
|
||||
if isOldSuccessful && !isSuccessful(newSVMBundle) {
|
||||
|
|
@ -126,11 +120,3 @@ func isFailed(svm *storagemigration.StorageVersionMigration) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isRunning(svm *storagemigration.StorageVersionMigration) bool {
|
||||
runningCondition := metaconditions.FindStatusCondition(svm.Status.Conditions, string(storagemigration.MigrationRunning))
|
||||
if runningCondition != nil && runningCondition.Status == metav1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -295,18 +295,6 @@ func TestValidateStorageVersionMigrationStatusUpdate(t *testing.T) {
|
|||
oldSVM: newTestSVM("123", runningCond),
|
||||
errorSubstring: "Both success and failed conditions cannot be true at the same time",
|
||||
},
|
||||
{
|
||||
name: "invalid: both succeeded and running are true",
|
||||
newSVM: newTestSVM("123", succeededCond, runningCond),
|
||||
oldSVM: newTestSVM("123", runningCond),
|
||||
errorSubstring: "Running condition cannot be true when success condition is true",
|
||||
},
|
||||
{
|
||||
name: "invalid: both failed and running are true",
|
||||
newSVM: newTestSVM("123", failedCond, runningCond),
|
||||
oldSVM: newTestSVM("123", runningCond),
|
||||
errorSubstring: "Running condition cannot be true when failed condition is true",
|
||||
},
|
||||
{
|
||||
name: "invalid: succeeded changed from true to false",
|
||||
newSVM: newTestSVM("123", succeededFalseCond),
|
||||
|
|
|
|||
424
pkg/controller/storageversionmigrator/migrationrunner.go
Normal file
424
pkg/controller/storageversionmigrator/migrationrunner.go
Normal file
|
|
@ -0,0 +1,424 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storageversionmigrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiextensions-apiserver/pkg/apihelpers"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
applyconfigurationapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/applyconfiguration/apiextensions/v1"
|
||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
svmv1beta1 "k8s.io/api/storagemigration/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
svminformers "k8s.io/client-go/informers/storagemigration/v1beta1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
svmlisters "k8s.io/client-go/listers/storagemigration/v1beta1"
|
||||
)
|
||||
|
||||
const MigrationRunnerControllerName string = "migration-runner-controller"
|
||||
|
||||
// MigrationRunnerController is responsible for managing the lifecycle of
|
||||
// StorageVersionMigration resources. It will ensure that only a single StorageVersionMigration
|
||||
// will be active at a time for a given GroupResource, by adding the MigrationRunning condition
|
||||
// to the SVMs that are active. Only these SVMs will be processed by the downstream migration controllers.
|
||||
//
|
||||
// It adds the StorageMigrating condition to the CRD and is in charge of removing old
|
||||
// stored versions from the CRD status.
|
||||
type MigrationRunnerController struct {
|
||||
svmListers svmlisters.StorageVersionMigrationLister
|
||||
svmSynced cache.InformerSynced
|
||||
queue workqueue.TypedRateLimitingInterface[metav1.GroupResource]
|
||||
kubeClient clientset.Interface
|
||||
crdClient apiextensionsclient.CustomResourceDefinitionInterface
|
||||
}
|
||||
|
||||
func NewCustomResourceController(
|
||||
ctx context.Context,
|
||||
kubeClient clientset.Interface,
|
||||
svmInformer svminformers.StorageVersionMigrationInformer,
|
||||
crdClient apiextensionsclient.CustomResourceDefinitionInterface,
|
||||
) *MigrationRunnerController {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
crController := &MigrationRunnerController{
|
||||
kubeClient: kubeClient,
|
||||
svmListers: svmInformer.Lister(),
|
||||
svmSynced: svmInformer.Informer().HasSynced,
|
||||
crdClient: crdClient,
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[metav1.GroupResource](),
|
||||
workqueue.TypedRateLimitingQueueConfig[metav1.GroupResource]{Name: ResourceVersionControllerName},
|
||||
),
|
||||
}
|
||||
|
||||
_, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
crController.addSVM(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
crController.updateSVM(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
crController.deleteSVM(logger, obj)
|
||||
},
|
||||
})
|
||||
|
||||
return crController
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) addSVM(logger klog.Logger, obj interface{}) {
|
||||
svm := obj.(*svmv1beta1.StorageVersionMigration)
|
||||
logger.V(4).Info("Adding", "svm", klog.KObj(svm))
|
||||
crc.enqueue(svm)
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) {
|
||||
oldSVM := oldObj.(*svmv1beta1.StorageVersionMigration)
|
||||
newSVM := newObj.(*svmv1beta1.StorageVersionMigration)
|
||||
logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM))
|
||||
crc.enqueue(newSVM)
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) deleteSVM(logger klog.Logger, obj interface{}) {
|
||||
svm, ok := obj.(*svmv1beta1.StorageVersionMigration)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
logger.Info("could not cast obj to DeletedFinalStateUnknown", "object", obj)
|
||||
return
|
||||
}
|
||||
svm, ok = tombstone.Obj.(*svmv1beta1.StorageVersionMigration)
|
||||
if !ok {
|
||||
logger.Info("could not cast tombstone to SVM", "object", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
logger.V(4).Info("Deleting", "svm", klog.KObj(svm))
|
||||
crc.enqueue(svm)
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) enqueue(svm *svmv1beta1.StorageVersionMigration) {
|
||||
crc.queue.Add(svm.Spec.Resource)
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) Run(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Starting", "controller", MigrationRunnerControllerName)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
logger.Info("Shutting down", "controller", MigrationRunnerControllerName)
|
||||
crc.queue.ShutDown()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
if !cache.WaitForNamedCacheSyncWithContext(ctx, crc.svmSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Go(func() {
|
||||
wait.UntilWithContext(ctx, crc.worker, time.Second)
|
||||
})
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) worker(ctx context.Context) {
|
||||
for crc.processNext(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) processNext(ctx context.Context) bool {
|
||||
key, quit := crc.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer crc.queue.Done(key)
|
||||
|
||||
err := crc.sync(ctx, key)
|
||||
if err == nil {
|
||||
crc.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing SVM resource, retrying", "svm", key)
|
||||
crc.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) sync(ctx context.Context, resource metav1.GroupResource) error {
|
||||
// Get all SVMs for this resource and trigger re-sync
|
||||
svmsToCleanup, svmToPromote, err := crc.getSVMsForResource(resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, svm := range svmsToCleanup {
|
||||
if err := crc.cleanupAdmission(ctx, svm); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if svmToPromote != nil {
|
||||
if err := crc.markAsActive(ctx, svmToPromote); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) cleanupAdmission(ctx context.Context, svm *svmv1beta1.StorageVersionMigration) error {
|
||||
needsUpdate := false
|
||||
migratingCond := meta.FindStatusCondition(svm.Status.Conditions, string(svmv1beta1.MigrationRunning))
|
||||
if migratingCond != nil && migratingCond.Status == metav1.ConditionTrue {
|
||||
newCond := &metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionFalse,
|
||||
Reason: "MigrationCompleted",
|
||||
Message: "Migration completed",
|
||||
ObservedGeneration: svm.Generation,
|
||||
}
|
||||
|
||||
meta.SetStatusCondition(&svm.Status.Conditions, *newCond)
|
||||
needsUpdate = true
|
||||
}
|
||||
|
||||
if needsUpdate {
|
||||
if err := crc.cleanupCRD(ctx, svm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := crc.kubeClient.StoragemigrationV1beta1().
|
||||
StorageVersionMigrations().
|
||||
UpdateStatus(
|
||||
ctx,
|
||||
svm,
|
||||
metav1.UpdateOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) markAsActive(ctx context.Context, svm *svmv1beta1.StorageVersionMigration) error {
|
||||
// Mark CRD as undergoing migration.
|
||||
crd, exists, err := crc.crdForGroupResource(ctx, svm.Spec.Resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
crd, err := crc.crdClient.Get(ctx, crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
applyConfig := applyconfigurationapiextensionsv1.CustomResourceDefinition(crd.Name).
|
||||
// We add resource version to ensure there were no changes to the CRD
|
||||
// between our get and apply, since if there was that may affect the
|
||||
// generation we set in the condition.
|
||||
WithResourceVersion(crd.ResourceVersion).
|
||||
WithStatus(applyconfigurationapiextensionsv1.CustomResourceDefinitionStatus().
|
||||
WithConditions(applyconfigurationapiextensionsv1.CustomResourceDefinitionCondition().
|
||||
WithType(apiextensionsv1.StorageMigrating).
|
||||
WithStatus(apiextensionsv1.ConditionTrue).
|
||||
WithLastTransitionTime(metav1.Now()).
|
||||
WithReason("MigrationRunning").
|
||||
WithMessage(fmt.Sprintf("Migration %s is running", svm.Name)).
|
||||
WithObservedGeneration(crd.Generation)))
|
||||
|
||||
// TODO: We should see if there's a way to surface non-conflict/not-found errors
|
||||
// to the SVM Status to let the user know that the migration is stuck due to issues
|
||||
// outside of the controller's control.
|
||||
_, err = crc.crdClient.ApplyStatus(ctx, applyConfig, metav1.ApplyOptions{FieldManager: MigrationRunnerControllerName, Force: true})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
svm = setStatusConditions(svm, svmv1beta1.MigrationRunning, "MigrationRunning", "The migration is running")
|
||||
_, err = crc.kubeClient.StoragemigrationV1beta1().
|
||||
StorageVersionMigrations().
|
||||
UpdateStatus(
|
||||
ctx,
|
||||
svm,
|
||||
metav1.UpdateOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareSVM(a, b *svmv1beta1.StorageVersionMigration) int {
|
||||
if i := a.CreationTimestamp.Compare(b.CreationTimestamp.Time); i != 0 {
|
||||
return i
|
||||
}
|
||||
return strings.Compare(a.Name, b.Name)
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) getSVMsForResource(resource metav1.GroupResource) (svmsToCleanup []*svmv1beta1.StorageVersionMigration, svmToPromote *svmv1beta1.StorageVersionMigration, err error) {
|
||||
svms, err := crc.svmListers.List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
svmInProgress := false
|
||||
for _, svm := range svms {
|
||||
if svm.Spec.Resource != resource {
|
||||
continue
|
||||
}
|
||||
svmRunning := isRunning(svm)
|
||||
svmTerminal := isTerminal(svm)
|
||||
promotable := !svmRunning && !svmTerminal && !svmInProgress
|
||||
switch {
|
||||
case svmRunning && svmTerminal:
|
||||
svmsToCleanup = append(svmsToCleanup, svm.DeepCopy())
|
||||
|
||||
case svmRunning:
|
||||
svmInProgress = true
|
||||
svmToPromote = nil
|
||||
|
||||
case promotable && (svmToPromote == nil || compareSVM(svm, svmToPromote) == -1):
|
||||
svmToPromote = svm
|
||||
}
|
||||
}
|
||||
|
||||
if svmToPromote != nil {
|
||||
svmToPromote = svmToPromote.DeepCopy()
|
||||
}
|
||||
return svmsToCleanup, svmToPromote, nil
|
||||
}
|
||||
|
||||
func isTerminal(svm *svmv1beta1.StorageVersionMigration) bool {
|
||||
return meta.IsStatusConditionTrue(svm.Status.Conditions, string(svmv1beta1.MigrationSucceeded)) ||
|
||||
meta.IsStatusConditionTrue(svm.Status.Conditions, string(svmv1beta1.MigrationFailed))
|
||||
}
|
||||
|
||||
func isRunning(svm *svmv1beta1.StorageVersionMigration) bool {
|
||||
return meta.IsStatusConditionTrue(svm.Status.Conditions, string(svmv1beta1.MigrationRunning))
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) cleanupCRD(ctx context.Context, toBeProcessedSVM *svmv1beta1.StorageVersionMigration) error {
|
||||
migratingSuccessfulCond := meta.FindStatusCondition(toBeProcessedSVM.Status.Conditions, string(svmv1beta1.MigrationSucceeded))
|
||||
success := migratingSuccessfulCond != nil && migratingSuccessfulCond.Status == metav1.ConditionTrue
|
||||
crd, exists, err := crc.crdForGroupResource(ctx, toBeProcessedSVM.Spec.Resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
if err := crc.finishCRDMigration(ctx, crd, success); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) finishCRDMigration(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, success bool) error {
|
||||
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
crd, err := crc.crdClient.Get(ctx, crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
migratingCond := apihelpers.FindCRDCondition(crd, apiextensionsv1.StorageMigrating)
|
||||
var canUpdateStoredVersions bool
|
||||
if migratingCond != nil && migratingCond.ObservedGeneration == crd.Generation {
|
||||
canUpdateStoredVersions = true
|
||||
}
|
||||
|
||||
var reason, msg string
|
||||
if success {
|
||||
reason = "MigrationSucceeded"
|
||||
msg = "The migration has succeeded"
|
||||
if canUpdateStoredVersions {
|
||||
msg += " and the stored versions have been updated"
|
||||
} else {
|
||||
msg += " but the stored versions have not been updated due to a generation mismatch"
|
||||
}
|
||||
} else {
|
||||
reason = "MigrationFailed"
|
||||
msg = "The migration has failed"
|
||||
}
|
||||
|
||||
applyConfig := applyconfigurationapiextensionsv1.CustomResourceDefinition(crd.Name).
|
||||
// We add resource version to ensure there were no changes to the CRD
|
||||
// between our get and apply, since if there was that may affect the validity
|
||||
// of the stored versions we are about to set.
|
||||
WithResourceVersion(crd.ResourceVersion).
|
||||
WithStatus(applyconfigurationapiextensionsv1.CustomResourceDefinitionStatus().
|
||||
WithConditions(applyconfigurationapiextensionsv1.CustomResourceDefinitionCondition().
|
||||
WithType(apiextensionsv1.StorageMigrating).
|
||||
WithStatus(apiextensionsv1.ConditionFalse).
|
||||
WithLastTransitionTime(metav1.Now()).
|
||||
WithReason(reason).
|
||||
WithMessage(msg).
|
||||
WithObservedGeneration(crd.Generation)))
|
||||
|
||||
if canUpdateStoredVersions {
|
||||
// Wipe out all stored versions that we have migrated off of if nothing else
|
||||
// has transpired while the migration was in progress.
|
||||
for _, version := range crd.Spec.Versions {
|
||||
if version.Storage {
|
||||
applyConfig.Status.WithStoredVersions(version.Name)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: We should see if there's a way to surface non-conflict/not-found errors
|
||||
// to the SVM Status to let the user know that the migration is stuck due to issues
|
||||
// outside of the controller's control.
|
||||
_, err = crc.crdClient.ApplyStatus(ctx, applyConfig, metav1.ApplyOptions{FieldManager: MigrationRunnerControllerName, Force: true})
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (crc *MigrationRunnerController) crdForGroupResource(ctx context.Context, gr metav1.GroupResource) (*apiextensionsv1.CustomResourceDefinition, bool, error) {
|
||||
crdName := fmt.Sprintf("%s.%s", gr.Resource, gr.Group)
|
||||
crd, err := crc.crdClient.Get(ctx, crdName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return crd, true, nil
|
||||
}
|
||||
471
pkg/controller/storageversionmigrator/migrationrunner_test.go
Normal file
471
pkg/controller/storageversionmigrator/migrationrunner_test.go
Normal file
|
|
@ -0,0 +1,471 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storageversionmigrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
svmv1beta1 "k8s.io/api/storagemigration/v1beta1"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
|
||||
apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
svminformers "k8s.io/client-go/informers"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
)
|
||||
|
||||
func init() {
|
||||
_ = apiextensionsv1.AddToScheme(apiextensionsscheme.Scheme)
|
||||
}
|
||||
|
||||
func TestCustomResourceController_Sync(t *testing.T) {
|
||||
newSVM := func(name, group, resource string, conditions ...metav1.Condition) *svmv1beta1.StorageVersionMigration {
|
||||
return &svmv1beta1.StorageVersionMigration{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
Spec: svmv1beta1.StorageVersionMigrationSpec{
|
||||
Resource: metav1.GroupResource{Group: group, Resource: resource},
|
||||
},
|
||||
Status: svmv1beta1.StorageVersionMigrationStatus{
|
||||
Conditions: conditions,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
newCRD := func(name string, generation int64, storageVersion string) *apiextensionsv1.CustomResourceDefinition {
|
||||
return &apiextensionsv1.CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Generation: generation,
|
||||
},
|
||||
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
|
||||
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
|
||||
{Name: "v1", Storage: storageVersion == "v1"},
|
||||
{Name: "v2", Storage: storageVersion == "v2"},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
newCRDPatchAction := func(name string, conditions []apiextensionsv1.CustomResourceDefinitionCondition, storedVersions []string) k8stesting.PatchAction {
|
||||
statusMap := map[string]interface{}{
|
||||
"conditions": conditions,
|
||||
}
|
||||
if storedVersions != nil {
|
||||
statusMap["storedVersions"] = storedVersions
|
||||
}
|
||||
patchObj := map[string]interface{}{
|
||||
"status": statusMap,
|
||||
}
|
||||
patch, err := json.Marshal(patchObj)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return k8stesting.NewPatchAction(
|
||||
apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"),
|
||||
"",
|
||||
name,
|
||||
types.MergePatchType,
|
||||
patch,
|
||||
)
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
svms []*svmv1beta1.StorageVersionMigration
|
||||
crd *apiextensionsv1.CustomResourceDefinition
|
||||
expectErr bool
|
||||
expectKubeActions []k8stesting.Action
|
||||
expectCRDActions []k8stesting.Action
|
||||
}{
|
||||
{
|
||||
name: "Initial sync: CRD missing",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets")},
|
||||
expectErr: false,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVM("test-svm", "example.com", "widgets", metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MigrationRunning",
|
||||
Message: "The migration is running",
|
||||
}),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Initial sync: CRD exists",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets")},
|
||||
crd: newCRD("widgets.example.com", 1, "v2"),
|
||||
expectErr: false,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVM("test-svm", "example.com", "widgets", metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MigrationRunning",
|
||||
Message: "The migration is running",
|
||||
}),
|
||||
),
|
||||
},
|
||||
expectCRDActions: []k8stesting.Action{
|
||||
newCRDPatchAction(
|
||||
"widgets.example.com",
|
||||
[]apiextensionsv1.CustomResourceDefinitionCondition{
|
||||
{
|
||||
Type: apiextensionsv1.StorageMigrating,
|
||||
Status: apiextensionsv1.ConditionTrue,
|
||||
Reason: "MigrationRunning",
|
||||
Message: "Migration test-svm is running",
|
||||
},
|
||||
},
|
||||
nil,
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Migration running (No-op)",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets", metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
}, metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
})},
|
||||
crd: newCRD("widgets.example.com", 1, "v2"),
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
name: "Migration succeeded: Update CRD stored versions",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets",
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationSucceeded),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
)},
|
||||
crd: func() *apiextensionsv1.CustomResourceDefinition {
|
||||
c := newCRD("widgets.example.com", 1, "v2")
|
||||
c.Status.Conditions = []apiextensionsv1.CustomResourceDefinitionCondition{
|
||||
{
|
||||
Type: apiextensionsv1.StorageMigrating,
|
||||
Status: apiextensionsv1.ConditionTrue,
|
||||
ObservedGeneration: 1,
|
||||
Reason: "MigrationRunning",
|
||||
},
|
||||
}
|
||||
return c
|
||||
}(),
|
||||
expectErr: false,
|
||||
expectCRDActions: []k8stesting.Action{
|
||||
newCRDPatchAction(
|
||||
"widgets.example.com",
|
||||
[]apiextensionsv1.CustomResourceDefinitionCondition{
|
||||
{
|
||||
Type: apiextensionsv1.StorageMigrating,
|
||||
Status: apiextensionsv1.ConditionFalse,
|
||||
Reason: "MigrationSucceeded",
|
||||
Message: "The migration has succeeded and the stored versions have been updated",
|
||||
},
|
||||
},
|
||||
[]string{"v2"},
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Migration succeeded: CRD generation mismatch (No update)",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets",
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationSucceeded),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
)},
|
||||
crd: newCRD("widgets.example.com", 2, "v2"),
|
||||
expectErr: false,
|
||||
expectCRDActions: []k8stesting.Action{
|
||||
newCRDPatchAction(
|
||||
"widgets.example.com",
|
||||
[]apiextensionsv1.CustomResourceDefinitionCondition{
|
||||
{
|
||||
Type: apiextensionsv1.StorageMigrating,
|
||||
Status: apiextensionsv1.ConditionFalse,
|
||||
Reason: "MigrationSucceeded",
|
||||
Message: "The migration has succeeded but the stored versions have not been updated due to a generation mismatch",
|
||||
},
|
||||
},
|
||||
nil,
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Migration failed: CRD condition updated",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets",
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationFailed),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
)},
|
||||
crd: func() *apiextensionsv1.CustomResourceDefinition {
|
||||
c := newCRD("widgets.example.com", 1, "v2")
|
||||
c.Status.Conditions = []apiextensionsv1.CustomResourceDefinitionCondition{
|
||||
{
|
||||
Type: apiextensionsv1.StorageMigrating,
|
||||
Status: apiextensionsv1.ConditionTrue,
|
||||
ObservedGeneration: 1,
|
||||
Reason: "MigrationRunning",
|
||||
},
|
||||
}
|
||||
return c
|
||||
}(),
|
||||
expectErr: false,
|
||||
expectCRDActions: []k8stesting.Action{
|
||||
newCRDPatchAction(
|
||||
"widgets.example.com",
|
||||
[]apiextensionsv1.CustomResourceDefinitionCondition{
|
||||
{
|
||||
Type: apiextensionsv1.StorageMigrating,
|
||||
Status: apiextensionsv1.ConditionFalse,
|
||||
Reason: "MigrationFailed",
|
||||
Message: "The migration has failed",
|
||||
},
|
||||
},
|
||||
[]string{"v2"},
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Cleanup admission updates SVM condition",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{newSVM("test-svm", "example.com", "widgets",
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationSucceeded),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MigrationRunning",
|
||||
ObservedGeneration: 1,
|
||||
},
|
||||
)},
|
||||
expectErr: false,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateSubresourceAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"status",
|
||||
"",
|
||||
newSVM("test-svm", "example.com", "widgets",
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationSucceeded),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionFalse,
|
||||
Reason: "MigrationCompleted",
|
||||
},
|
||||
),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Promote older SVM first",
|
||||
svms: []*svmv1beta1.StorageVersionMigration{
|
||||
func() *svmv1beta1.StorageVersionMigration {
|
||||
svm := newSVM("test-svm-newer", "example.com", "widgets")
|
||||
svm.CreationTimestamp = metav1.Time{Time: time.Now().Add(1 * time.Hour)}
|
||||
return svm
|
||||
}(),
|
||||
func() *svmv1beta1.StorageVersionMigration {
|
||||
svm := newSVM("test-svm-older", "example.com", "widgets")
|
||||
svm.CreationTimestamp = metav1.Time{Time: time.Now().Add(-1 * time.Hour)}
|
||||
return svm
|
||||
}(),
|
||||
},
|
||||
expectErr: false,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVM("test-svm-older", "example.com", "widgets", metav1.Condition{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MigrationRunning",
|
||||
Message: "The migration is running",
|
||||
}),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
filterActions := func(actions []k8stesting.Action) []k8stesting.Action {
|
||||
var ret []k8stesting.Action
|
||||
for _, action := range actions {
|
||||
if action.GetVerb() == "list" || action.GetVerb() == "watch" || action.GetVerb() == "get" {
|
||||
continue
|
||||
}
|
||||
ret = append(ret, action)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
var initialSVMs []runtime.Object
|
||||
for _, svm := range tc.svms {
|
||||
initialSVMs = append(initialSVMs, svm)
|
||||
}
|
||||
kubeClient := kubefake.NewClientset(initialSVMs...)
|
||||
|
||||
kubeInformerFactory := svminformers.NewSharedInformerFactory(kubeClient, 0)
|
||||
svmInformer := kubeInformerFactory.Storagemigration().V1beta1().StorageVersionMigrations()
|
||||
for _, svm := range tc.svms {
|
||||
err := svmInformer.Informer().GetStore().Add(svm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var initialCRDs []runtime.Object
|
||||
if tc.crd != nil {
|
||||
initialCRDs = append(initialCRDs, tc.crd)
|
||||
}
|
||||
crdClientSet := apiextensionsfake.NewClientset(initialCRDs...)
|
||||
|
||||
crdScheme := runtime.NewScheme()
|
||||
_ = apiextensionsv1.AddToScheme(crdScheme)
|
||||
crdTracker := k8stesting.NewObjectTracker(crdScheme, serializer.NewCodecFactory(crdScheme).UniversalDecoder())
|
||||
|
||||
for _, obj := range initialCRDs {
|
||||
_ = crdTracker.Add(obj)
|
||||
}
|
||||
|
||||
crdClientSet.PrependReactor("*", "*", k8stesting.ObjectReaction(crdTracker))
|
||||
|
||||
controller := NewCustomResourceController(
|
||||
ctx,
|
||||
kubeClient,
|
||||
svmInformer,
|
||||
crdClientSet.ApiextensionsV1().CustomResourceDefinitions(),
|
||||
)
|
||||
|
||||
resource := metav1.GroupResource{Group: "example.com", Resource: "widgets"}
|
||||
if len(tc.svms) > 0 {
|
||||
resource = tc.svms[0].Spec.Resource
|
||||
}
|
||||
|
||||
err := controller.sync(ctx, resource)
|
||||
|
||||
if tc.expectErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if tc.expectKubeActions != nil {
|
||||
kubeActions := filterActions(kubeClient.Actions())
|
||||
require.Len(t, kubeActions, len(tc.expectKubeActions), "mismatched number of kube client actions")
|
||||
|
||||
for i, expected := range tc.expectKubeActions {
|
||||
actual := kubeActions[i]
|
||||
require.Equal(t, expected.GetVerb(), actual.GetVerb(), "kube action %d: verb mismatch", i)
|
||||
require.Equal(t, expected.GetResource(), actual.GetResource(), "kube action %d: resource mismatch", i)
|
||||
|
||||
if updateAction, ok := actual.(k8stesting.UpdateAction); ok {
|
||||
actualObj := updateAction.GetObject().(*svmv1beta1.StorageVersionMigration)
|
||||
expectedObj := expected.(k8stesting.UpdateAction).GetObject().(*svmv1beta1.StorageVersionMigration)
|
||||
|
||||
require.Equal(t, expectedObj.Name, actualObj.Name, "object name mismatch")
|
||||
require.Len(t, actualObj.Status.Conditions, len(expectedObj.Status.Conditions), "condition count mismatch")
|
||||
for j, expCond := range expectedObj.Status.Conditions {
|
||||
actCond := actualObj.Status.Conditions[j]
|
||||
require.Equal(t, expCond.Type, actCond.Type, "condition type mismatch")
|
||||
require.Equal(t, expCond.Status, actCond.Status, "condition status mismatch")
|
||||
if expCond.Reason != "" {
|
||||
require.Equal(t, expCond.Reason, actCond.Reason, "condition reason mismatch")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify CRD status updates
|
||||
if tc.expectCRDActions != nil {
|
||||
crdActions := filterActions(crdClientSet.Actions())
|
||||
require.Len(t, crdActions, len(tc.expectCRDActions), "mismatched number of crd client actions")
|
||||
|
||||
for i, expected := range tc.expectCRDActions {
|
||||
actual := crdActions[i]
|
||||
require.Equal(t, expected.GetVerb(), actual.GetVerb(), "crd action %d: verb mismatch", i)
|
||||
|
||||
if patchAction, ok := actual.(k8stesting.PatchAction); ok {
|
||||
actualPatch := patchAction.GetPatch()
|
||||
expectedPatch := expected.(k8stesting.PatchAction).GetPatch()
|
||||
|
||||
var actualMap, expectedMap map[string]interface{}
|
||||
require.NoError(t, json.Unmarshal(actualPatch, &actualMap))
|
||||
require.NoError(t, json.Unmarshal(expectedPatch, &expectedMap))
|
||||
|
||||
// Helper to verify conditions ignoring LastTransitionTime
|
||||
actualStatus := actualMap["status"].(map[string]interface{})
|
||||
expectedStatus := expectedMap["status"].(map[string]interface{})
|
||||
|
||||
if expectedConds, ok := expectedStatus["conditions"].([]interface{}); ok {
|
||||
actualConds := actualStatus["conditions"].([]interface{})
|
||||
require.Len(t, actualConds, len(expectedConds))
|
||||
for k, expC := range expectedConds {
|
||||
expCondMap := expC.(map[string]interface{})
|
||||
actCondMap := actualConds[k].(map[string]interface{})
|
||||
require.Equal(t, expCondMap["type"], actCondMap["type"])
|
||||
require.Equal(t, expCondMap["status"], actCondMap["status"])
|
||||
require.Equal(t, expCondMap["reason"], actCondMap["reason"])
|
||||
require.Equal(t, expCondMap["message"], actCondMap["message"])
|
||||
}
|
||||
// Remove conditions for deep equal of the rest
|
||||
delete(actualStatus, "conditions")
|
||||
delete(expectedStatus, "conditions")
|
||||
}
|
||||
|
||||
require.Equal(t, expectedStatus, actualStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -189,6 +189,11 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error
|
|||
}
|
||||
// working with copy to avoid race condition between this and migration controller
|
||||
toBeProcessedSVM := svm.DeepCopy()
|
||||
|
||||
if !meta.IsStatusConditionTrue(toBeProcessedSVM.Status.Conditions, string(svmv1beta1.MigrationRunning)) {
|
||||
logger.V(4).Info("Migration is not running yet, skipping", "svm", name)
|
||||
return nil
|
||||
}
|
||||
gr := toBeProcessedSVM.Spec.Resource
|
||||
|
||||
if meta.IsStatusConditionTrue(toBeProcessedSVM.Status.Conditions, string(svmv1beta1.MigrationSucceeded)) ||
|
||||
|
|
|
|||
|
|
@ -206,10 +206,20 @@ func TestRVSync(t *testing.T) {
|
|||
expectErr string
|
||||
expectKubeActions []kubetesting.Action
|
||||
}{
|
||||
{
|
||||
name: "Migration not active",
|
||||
key: "inactive-svm",
|
||||
svm: newSVMWithConditions("inactive-svm", "", []metav1.Condition{}),
|
||||
},
|
||||
{
|
||||
name: "Successful RV acquisition",
|
||||
key: "test-svm",
|
||||
svm: newSVM("test-svm", ""),
|
||||
svm: newSVMWithConditions("test-svm", "", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
discoveryResources: &metav1.APIResourceList{
|
||||
GroupVersion: "apps/v1",
|
||||
APIResources: []metav1.APIResource{
|
||||
|
|
@ -225,7 +235,12 @@ func TestRVSync(t *testing.T) {
|
|||
kubetesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVM("test-svm", "12345"),
|
||||
newSVMWithConditions("test-svm", "12345", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
),
|
||||
},
|
||||
},
|
||||
|
|
@ -234,6 +249,11 @@ func TestRVSync(t *testing.T) {
|
|||
key: "non-existent-svm",
|
||||
svm: nil,
|
||||
},
|
||||
{
|
||||
name: "SVM has no CRD condition",
|
||||
key: "succeeded-svm",
|
||||
svm: newSVM("succeeded-svm", ""),
|
||||
},
|
||||
{
|
||||
name: "SVM already succeeded",
|
||||
key: "succeeded-svm",
|
||||
|
|
@ -257,12 +277,22 @@ func TestRVSync(t *testing.T) {
|
|||
{
|
||||
name: "RV already set",
|
||||
key: "rv-set-svm",
|
||||
svm: newSVM("rv-set-svm", "123"),
|
||||
svm: newSVMWithConditions("rv-set-svm", "123", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Resource not migratable",
|
||||
key: "not-migratable-svm",
|
||||
svm: newSVM("not-migratable-svm", ""),
|
||||
svm: newSVMWithConditions("not-migratable-svm", "", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
discoveryResources: &metav1.APIResourceList{
|
||||
GroupVersion: "apps/v1",
|
||||
APIResources: []metav1.APIResource{
|
||||
|
|
@ -274,6 +304,10 @@ func TestRVSync(t *testing.T) {
|
|||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("not-migratable-svm", "", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationFailed),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
@ -285,7 +319,12 @@ func TestRVSync(t *testing.T) {
|
|||
{
|
||||
name: "Metadata list error",
|
||||
key: "metadata-error-svm",
|
||||
svm: newSVM("metadata-error-svm", ""),
|
||||
svm: newSVMWithConditions("metadata-error-svm", "", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
discoveryResources: &metav1.APIResourceList{
|
||||
GroupVersion: "apps/v1",
|
||||
APIResources: []metav1.APIResource{
|
||||
|
|
@ -299,7 +338,12 @@ func TestRVSync(t *testing.T) {
|
|||
{
|
||||
name: "Invalid RV returned",
|
||||
key: "invalid-rv-svm",
|
||||
svm: newSVM("invalid-rv-svm", ""),
|
||||
svm: newSVMWithConditions("invalid-rv-svm", "", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
discoveryResources: &metav1.APIResourceList{
|
||||
GroupVersion: "apps/v1",
|
||||
APIResources: []metav1.APIResource{
|
||||
|
|
@ -316,6 +360,10 @@ func TestRVSync(t *testing.T) {
|
|||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("invalid-rv-svm", "", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationFailed),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
|
|||
|
|
@ -262,17 +262,6 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
|
|||
return fmt.Errorf("GC cache is not up to date, requeuing to attempt again. gcListResourceVersion: %s, listResourceVersion: %s", gcListResourceVersion, listResourceVersion)
|
||||
}
|
||||
|
||||
toBeProcessedSVM, err = svmc.kubeClient.StoragemigrationV1beta1().
|
||||
StorageVersionMigrations().
|
||||
UpdateStatus(
|
||||
ctx,
|
||||
setStatusConditions(toBeProcessedSVM, svmv1beta1.MigrationRunning, migrationRunningStatusReason, ""),
|
||||
metav1.UpdateOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err, failedMigration := svmc.runMigration(ctx, *gvr, resourceMonitor, toBeProcessedSVM, listResourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -175,22 +175,10 @@ func TestSync(t *testing.T) {
|
|||
},
|
||||
expectErr: false,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
}}),
|
||||
),
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionFalse,
|
||||
},
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationSucceeded),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
@ -268,22 +256,10 @@ func TestSync(t *testing.T) {
|
|||
},
|
||||
expectErr: false,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
}}),
|
||||
),
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionFalse,
|
||||
},
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationFailed),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
@ -311,20 +287,6 @@ func TestSync(t *testing.T) {
|
|||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
),
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionFalse,
|
||||
},
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationSucceeded),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
@ -359,18 +321,6 @@ func TestSync(t *testing.T) {
|
|||
"ns1/res1": apierrors.NewTooManyRequests("simulating throttling", 1),
|
||||
},
|
||||
expectErr: true,
|
||||
expectKubeActions: []k8stesting.Action{
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
),
|
||||
},
|
||||
expectDynamicActions: []k8stesting.Action{
|
||||
k8stesting.NewPatchAction(testGVR, "ns1", "res1", types.ApplyPatchType, mustMarshal(t, typeMetaUIDRV{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "Deployment"},
|
||||
|
|
@ -422,20 +372,6 @@ func TestSync(t *testing.T) {
|
|||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
}),
|
||||
),
|
||||
k8stesting.NewUpdateAction(
|
||||
svmv1beta1.SchemeGroupVersion.WithResource("storageversionmigrations"),
|
||||
"",
|
||||
newSVMWithConditions("test-svm", "100", []metav1.Condition{
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationRunning),
|
||||
Status: metav1.ConditionFalse,
|
||||
},
|
||||
{
|
||||
Type: string(svmv1beta1.MigrationFailed),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
@ -484,7 +420,7 @@ func TestSync(t *testing.T) {
|
|||
|
||||
if tc.expectKubeActions != nil {
|
||||
kubeActions := filterActions(kubeClient.Actions())
|
||||
require.Len(t, kubeActions, len(tc.expectKubeActions), "mismatched number of kube client actions")
|
||||
require.Len(t, kubeActions, len(tc.expectKubeActions), "mismatched number of kube client actions, expected %d, got %d", len(tc.expectKubeActions), len(kubeActions))
|
||||
|
||||
for i, expected := range tc.expectKubeActions {
|
||||
actual := kubeActions[i]
|
||||
|
|
|
|||
|
|
@ -34,14 +34,6 @@ func setStatusConditions(
|
|||
return toBeUpdatedSVM
|
||||
}
|
||||
|
||||
if conditionType == svmv1beta1.MigrationSucceeded || conditionType == svmv1beta1.MigrationFailed {
|
||||
// set running condition to false if we're finished
|
||||
runningCond := meta.FindStatusCondition(toBeUpdatedSVM.Status.Conditions, string(svmv1beta1.MigrationRunning))
|
||||
if runningCond != nil {
|
||||
runningCond.Status = metav1.ConditionFalse
|
||||
}
|
||||
}
|
||||
|
||||
toBeUpdatedSVM.Status.Conditions = append(toBeUpdatedSVM.Status.Conditions, metav1.Condition{
|
||||
Type: string(conditionType),
|
||||
Status: metav1.ConditionTrue,
|
||||
|
|
|
|||
|
|
@ -1968,6 +1968,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
|||
|
||||
apiextensionsfeatures.CRDObservedGenerationTracking: {
|
||||
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Beta},
|
||||
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
apiextensionsfeatures.CRDValidationRatcheting: {
|
||||
|
|
|
|||
|
|
@ -535,6 +535,8 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
|
|||
// need patch for SSA of any resource
|
||||
// need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set
|
||||
rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get").Groups("apiextensions.k8s.io").Resources("customresourcedefinitions").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("patch").Groups("apiextensions.k8s.io").Resources("customresourcedefinitions/status").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(),
|
||||
},
|
||||
})
|
||||
|
|
|
|||
|
|
@ -332,6 +332,9 @@ const (
|
|||
// be true if .metadata.annotations["api-approved.kubernetes.io"] is set to a URL, otherwise it will be false.
|
||||
// See https://github.com/kubernetes/enhancements/pull/1111 for more details.
|
||||
KubernetesAPIApprovalPolicyConformant CustomResourceDefinitionConditionType = "KubernetesAPIApprovalPolicyConformant"
|
||||
// StorageMigrating indicates that the underlying storage version of the CRD
|
||||
// is undergoing migration.
|
||||
StorageMigrating CustomResourceDefinitionConditionType = "StorageMigrating"
|
||||
)
|
||||
|
||||
// CustomResourceDefinitionCondition contains details for the current condition of this pod.
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ func init() {
|
|||
var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate.VersionedSpecs{
|
||||
CRDObservedGenerationTracking: {
|
||||
{Version: version.MustParse("1.35"), PreRelease: featuregate.Beta, Default: false},
|
||||
{Version: version.MustParse("1.36"), PreRelease: featuregate.Beta, Default: true},
|
||||
},
|
||||
CRDValidationRatcheting: {
|
||||
{Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Alpha},
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@
|
|||
| CPUManagerPolicyAlphaOptions | | | 1.23– | | | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyAlphaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyAlphaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CPUManagerPolicyBetaOptions | :ballot_box_with_check: 1.23+ | | | 1.23– | | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyBetaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyBetaOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CPUManagerPolicyOptions | :ballot_box_with_check: 1.23+ | :closed_lock_with_key: 1.33+ | 1.22 | 1.23–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbCPUManagerPolicyOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCPUManagerPolicyOptions%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CRDObservedGenerationTracking | | | | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CRDObservedGenerationTracking | :ballot_box_with_check: 1.36+ | | | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDObservedGenerationTracking%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CRDValidationRatcheting | :ballot_box_with_check: 1.30+ | :closed_lock_with_key: 1.33+ | 1.28–1.29 | 1.30–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbCRDValidationRatcheting%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCRDValidationRatcheting%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CSIServiceAccountTokenSecrets | :ballot_box_with_check: 1.35+ | :closed_lock_with_key: 1.36+ | | 1.35 | 1.36– | | | [code](https://cs.k8s.io/?q=%5CbCSIServiceAccountTokenSecrets%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCSIServiceAccountTokenSecrets%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| CSIVolumeHealth | | | 1.21– | | | | | [code](https://cs.k8s.io/?q=%5CbCSIVolumeHealth%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbCSIVolumeHealth%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
|
|
|
|||
|
|
@ -365,6 +365,10 @@
|
|||
lockToDefault: false
|
||||
preRelease: Beta
|
||||
version: "1.35"
|
||||
- default: true
|
||||
lockToDefault: false
|
||||
preRelease: Beta
|
||||
version: "1.36"
|
||||
- name: CRDValidationRatcheting
|
||||
versionedSpecs:
|
||||
- default: false
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"go.uber.org/goleak"
|
||||
|
||||
extensionfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
|
@ -154,6 +155,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
|
|||
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
|
||||
features.StorageVersionMigrator: true,
|
||||
featuregate.Feature(clientgofeaturegate.InformerResourceVersion): true,
|
||||
extensionfeatures.CRDObservedGenerationTracking: true,
|
||||
})
|
||||
// decode errors are expected when using conversation webhooks
|
||||
etcd3watcher.TestOnlySetFatalOnDecodeError(false)
|
||||
|
|
@ -262,7 +264,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Failed to create SVM resource: %v", err)
|
||||
}
|
||||
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, "triggercr"); !ok {
|
||||
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, crd.Name, "triggercr"); !ok {
|
||||
t.Fatalf("CRD not migrated")
|
||||
}
|
||||
|
||||
|
|
@ -303,6 +305,7 @@ func TestStorageVersionMigrationDuringChaos(t *testing.T) {
|
|||
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
|
||||
features.StorageVersionMigrator: true,
|
||||
featuregate.Feature(clientgofeaturegate.InformerResourceVersion): true,
|
||||
extensionfeatures.CRDObservedGenerationTracking: true,
|
||||
})
|
||||
|
||||
ctx := ktesting.Init(t)
|
||||
|
|
@ -343,7 +346,7 @@ func TestStorageVersionMigrationDuringChaos(t *testing.T) {
|
|||
return
|
||||
}
|
||||
triggerCRName := "chaos-trigger-" + strconv.Itoa(i)
|
||||
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, triggerCRName); !ok {
|
||||
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, crd.Name, triggerCRName); !ok {
|
||||
t.Errorf("CRD not migrated")
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1054,6 +1054,24 @@ func (svm *svmTest) setupServerCert(t *testing.T) *certContext {
|
|||
}
|
||||
}
|
||||
|
||||
func (svm *svmTest) crdMigrated(t *testing.T, crdName string) bool {
|
||||
t.Helper()
|
||||
|
||||
crd, err := svm.apiextensionsclient.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), crdName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get CRD: %v", err)
|
||||
}
|
||||
|
||||
var storedVersion string
|
||||
for _, version := range crd.Spec.Versions {
|
||||
if version.Storage {
|
||||
storedVersion = version.Name
|
||||
}
|
||||
}
|
||||
|
||||
return len(crd.Status.StoredVersions) == 1 && crd.Status.StoredVersions[0] == storedVersion
|
||||
}
|
||||
|
||||
func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bool {
|
||||
t.Helper()
|
||||
|
||||
|
|
@ -1072,7 +1090,7 @@ func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bo
|
|||
return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version)
|
||||
}
|
||||
|
||||
func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, triggerCRName string) bool {
|
||||
func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, crdName, triggerCRName string) bool {
|
||||
t.Helper()
|
||||
|
||||
var triggerOnce sync.Once
|
||||
|
|
@ -1099,8 +1117,8 @@ func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName,
|
|||
return false, nil
|
||||
}
|
||||
|
||||
if metaconditions.IsStatusConditionTrue(svmConditions, string(svmv1beta1.MigrationSucceeded)) {
|
||||
t.Logf("%q SVM has completed migration", crdSVMName)
|
||||
if metaconditions.IsStatusConditionTrue(svmConditions, string(svmv1beta1.MigrationSucceeded)) && svm.crdMigrated(t, crdName) {
|
||||
t.Logf("%q SVM has completed migration for crd %s", crdSVMName, crdName)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue