diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index 20f9523d050..a556ebd4240 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -25,7 +25,6 @@ import ( "time" "k8s.io/client-go/util/flowcontrol" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" @@ -35,85 +34,115 @@ import ( func newDaemonSetControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.DaemonSetController, - aliases: []string{"daemonset"}, - initFunc: startDaemonSetController, + name: names.DaemonSetController, + aliases: []string{"daemonset"}, + constructor: newDaemonSetController, } } -func startDaemonSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { + +func newDaemonSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("daemon-set-controller") + if err != nil { + return nil, err + } + dsc, err := daemon.NewDaemonSetsController( ctx, controllerContext.InformerFactory.Apps().V1().DaemonSets(), controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Nodes(), - controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"), + client, flowcontrol.NewBackOff(1*time.Second, 15*time.Minute), ) if err != nil { - return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err) + return nil, fmt.Errorf("error creating DaemonSets controller: %w", err) } - go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs)) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs)) + }, controllerName), nil } func newStatefulSetControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.StatefulSetController, - aliases: []string{"statefulset"}, - initFunc: startStatefulSetController, + name: names.StatefulSetController, + aliases: []string{"statefulset"}, + constructor: newStatefulSetController, } } -func startStatefulSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go statefulset.NewStatefulSetController( + +func newStatefulSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("statefulset-controller") + if err != nil { + return nil, err + } + + ssc := statefulset.NewStatefulSetController( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Apps().V1().StatefulSets(), controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), - controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"), - ).Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs)) - return nil, true, nil + client, + ) + return newControllerLoop(func(ctx context.Context) { + ssc.Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs)) + }, controllerName), nil } func newReplicaSetControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ReplicaSetController, - aliases: []string{"replicaset"}, - initFunc: startReplicaSetController, + name: names.ReplicaSetController, + aliases: []string{"replicaset"}, + constructor: newReplicaSetController, } } -func startReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go replicaset.NewReplicaSetController( +func newReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("replicaset-controller") + if err != nil { + return nil, err + } + + rsc := replicaset.NewReplicaSetController( ctx, controllerContext.InformerFactory.Apps().V1().ReplicaSets(), controllerContext.InformerFactory.Core().V1().Pods(), - controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"), + client, replicaset.BurstReplicas, - ).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) - return nil, true, nil + ) + return newControllerLoop(func(ctx context.Context) { + rsc.Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) + }, controllerName), nil } func newDeploymentControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.DeploymentController, - aliases: []string{"deployment"}, - initFunc: startDeploymentController, + name: names.DeploymentController, + aliases: []string{"deployment"}, + constructor: newDeploymentController, } } -func startDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("deployment-controller") + if err != nil { + return nil, err + } + dc, err := deployment.NewDeploymentController( ctx, controllerContext.InformerFactory.Apps().V1().Deployments(), controllerContext.InformerFactory.Apps().V1().ReplicaSets(), controllerContext.InformerFactory.Core().V1().Pods(), - controllerContext.ClientBuilder.ClientOrDie("deployment-controller"), + client, ) if err != nil { - return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) + return nil, fmt.Errorf("error creating Deployment controller: %w", err) } - go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs)) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs)) + }, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index ff0c6d60f30..f41592ffc17 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -21,14 +21,12 @@ package app import ( "context" - + "fmt" "k8s.io/client-go/dynamic" "k8s.io/client-go/scale" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/podautoscaler" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" - resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" "k8s.io/metrics/pkg/client/custom_metrics" "k8s.io/metrics/pkg/client/external_metrics" @@ -36,47 +34,50 @@ import ( func newHorizontalPodAutoscalerControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.HorizontalPodAutoscalerController, - aliases: []string{"horizontalpodautoscaling"}, - initFunc: startHorizontalPodAutoscalerControllerWithRESTClient, + name: names.HorizontalPodAutoscalerController, + aliases: []string{"horizontalpodautoscaling"}, + constructor: newHorizontalPodAutoscalerController, } } -func startHorizontalPodAutoscalerControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newHorizontalPodAutoscalerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + clientConfig, err := controllerContext.NewClientConfig("horizontal-pod-autoscaler") + if err != nil { + return nil, err + } - clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") - hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") - - apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery()) - // invalidate the discovery information roughly once per resync interval our API - // information is *at most* two resync intervals old. - go custom_metrics.PeriodicallyInvalidate( - apiVersionsGetter, - controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, - ctx.Done()) - - metricsClient := metrics.NewRESTMetricsClient( - resourceclient.NewForConfigOrDie(clientConfig), - custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter), - external_metrics.NewForConfigOrDie(clientConfig), - ) - return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient) -} - -func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) { - - hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") - hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") + hpaClient, err := controllerContext.NewClient("horizontal-pod-autoscaler") + if err != nil { + return nil, err + } // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching, // so we want to re-fetch every time when we actually ask for it scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) - scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + scaleClient, err := scale.NewForConfig(clientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { - return nil, false, err + return nil, fmt.Errorf("failed to init HPA scale client: %w", err) } - go podautoscaler.NewHorizontalController( + apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery()) + + resourceClient, err := resourceclient.NewForConfig(clientConfig) + if err != nil { + return nil, fmt.Errorf("failed to init the resource client for %s: %w", controllerName, err) + } + + externalMetricsClient, err := external_metrics.NewForConfig(clientConfig) + if err != nil { + return nil, fmt.Errorf("failed to init the external metrics client for %s: %w", controllerName, err) + } + + metricsClient := metrics.NewRESTMetricsClient( + resourceClient, + custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter), + externalMetricsClient, + ) + + pas := podautoscaler.NewHorizontalController( ctx, hpaClient.CoreV1(), scaleClient, @@ -90,6 +91,16 @@ func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, - ).Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs)) - return nil, true, nil + ) + return newControllerLoop(concurrentRun( + func(ctx context.Context) { + custom_metrics.PeriodicallyInvalidate( + apiVersionsGetter, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, + ctx.Done()) + }, + func(ctx context.Context) { + pas.Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs)) + }, + ), controllerName), nil } diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index 159aebd8284..a1ae796755c 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -23,7 +23,6 @@ import ( "context" "fmt" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/job" @@ -31,43 +30,58 @@ import ( func newJobControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.JobController, - aliases: []string{"job"}, - initFunc: startJobController, + name: names.JobController, + aliases: []string{"job"}, + constructor: newJobController, } } -func startJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - jobController, err := job.NewController( +func newJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("job-controller") + if err != nil { + return nil, err + } + + jc, err := job.NewController( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Batch().V1().Jobs(), - controllerContext.ClientBuilder.ClientOrDie("job-controller"), + client, ) if err != nil { - return nil, true, fmt.Errorf("creating Job controller: %v", err) + return nil, fmt.Errorf("creating Job controller: %w", err) } - go jobController.Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs)) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + jc.Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs)) + }, controllerName), nil } func newCronJobControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.CronJobController, - aliases: []string{"cronjob"}, - initFunc: startCronJobController, + name: names.CronJobController, + aliases: []string{"cronjob"}, + constructor: newCronJobController, } } -func startCronJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(), +func newCronJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("cronjob-controller") + if err != nil { + return nil, err + } + + cj2c, err := cronjob.NewControllerV2( + ctx, + controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.InformerFactory.Batch().V1().CronJobs(), - controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), + client, ) if err != nil { - return nil, true, fmt.Errorf("creating CronJob controller V2: %v", err) + return nil, fmt.Errorf("creating CronJob controller V2: %w", err) } - go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs)) - return nil, true, nil + return newControllerLoop(func(ctx context.Context) { + cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs)) + }, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/bootstrap.go b/cmd/kube-controller-manager/app/bootstrap.go index aedaaf65aa0..4a3aeb8ed44 100644 --- a/cmd/kube-controller-manager/app/bootstrap.go +++ b/cmd/kube-controller-manager/app/bootstrap.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/bootstrap" ) @@ -29,41 +28,53 @@ func newBootstrapSignerControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ name: names.BootstrapSignerController, aliases: []string{"bootstrapsigner"}, - initFunc: startBootstrapSignerController, + constructor: newBootstrapSignerController, isDisabledByDefault: true, } } -func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { + +func newBootstrapSignerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("bootstrap-signer") + if err != nil { + return nil, err + } + bsc, err := bootstrap.NewSigner( - controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"), + client, controllerContext.InformerFactory.Core().V1().Secrets(), controllerContext.InformerFactory.Core().V1().ConfigMaps(), bootstrap.DefaultSignerOptions(), ) if err != nil { - return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err) + return nil, fmt.Errorf("error creating BootstrapSigner controller: %w", err) } - go bsc.Run(ctx) - return nil, true, nil + + return newControllerLoop(bsc.Run, controllerName), nil } func newTokenCleanerControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ name: names.TokenCleanerController, aliases: []string{"tokencleaner"}, - initFunc: startTokenCleanerController, + constructor: newTokenCleanerController, isDisabledByDefault: true, } } -func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { + +func newTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("token-cleaner") + if err != nil { + return nil, err + } + tcc, err := bootstrap.NewTokenCleaner( - controllerContext.ClientBuilder.ClientOrDie("token-cleaner"), + client, controllerContext.InformerFactory.Core().V1().Secrets(), bootstrap.DefaultTokenCleanerOptions(), ) if err != nil { - return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err) + return nil, fmt.Errorf("error creating TokenCleaner controller: %w", err) } - go tcc.Run(ctx) - return nil, true, nil + + return newControllerLoop(tcc.Run, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index 2a58dff5bda..22f9e055d0c 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -29,10 +29,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/server/dynamiccertificates" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" "k8s.io/component-base/featuregate" - "k8s.io/controller-manager/controller" "k8s.io/klog/v2" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/certificates/approver" @@ -47,33 +45,41 @@ import ( func newCertificateSigningRequestSigningControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.CertificateSigningRequestSigningController, - aliases: []string{"csrsigning"}, - initFunc: startCertificateSigningRequestSigningController, + name: names.CertificateSigningRequestSigningController, + aliases: []string{"csrsigning"}, + constructor: newCertificateSigningRequestSigningController, } } -func startCertificateSigningRequestSigningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newCertificateSigningRequestSigningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { logger := klog.FromContext(ctx) missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == "" if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) { logger.Info("Skipping CSR signer controller because no csr cert/key was specified") - return nil, false, nil + return nil, nil } if !missingSingleSigningFile && anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) { - return nil, false, fmt.Errorf("cannot specify default and per controller certs at the same time") + return nil, fmt.Errorf("cannot specify default and per controller certs at the same time") + } + + c, err := controllerContext.NewClient("certificate-controller") + if err != nil { + return nil, err } - c := controllerContext.ClientBuilder.ClientOrDie("certificate-controller") csrInformer := controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests() certTTL := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration + var rx []runFunc if kubeletServingSignerCertFile, kubeletServingSignerKeyFile := getKubeletServingSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletServingSignerCertFile) > 0 || len(kubeletServingSignerKeyFile) > 0 { kubeletServingSigner, err := signer.NewKubeletServingCSRSigningController(ctx, c, csrInformer, kubeletServingSignerCertFile, kubeletServingSignerKeyFile, certTTL) if err != nil { - return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) + return nil, fmt.Errorf("failed to init kubernetes.io/kubelet-serving certificate controller: %w", err) } - go kubeletServingSigner.Run(ctx, 5) + + rx = append(rx, func(ctx context.Context) { + kubeletServingSigner.Run(ctx, 5) + }) } else { logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kubelet-serving") } @@ -81,9 +87,12 @@ func startCertificateSigningRequestSigningController(ctx context.Context, contro if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 { kubeletClientSigner, err := signer.NewKubeletClientCSRSigningController(ctx, c, csrInformer, kubeletClientSignerCertFile, kubeletClientSignerKeyFile, certTTL) if err != nil { - return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) + return nil, fmt.Errorf("failed to init kubernetes.io/kube-apiserver-client-kubelet certificate controller: %w", err) } - go kubeletClientSigner.Run(ctx, 5) + + rx = append(rx, func(ctx context.Context) { + kubeletClientSigner.Run(ctx, 5) + }) } else { logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kube-apiserver-client-kubelet") } @@ -91,9 +100,12 @@ func startCertificateSigningRequestSigningController(ctx context.Context, contro if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 { kubeAPIServerClientSigner, err := signer.NewKubeAPIServerClientCSRSigningController(ctx, c, csrInformer, kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile, certTTL) if err != nil { - return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) + return nil, fmt.Errorf("failed to init kubernetes.io/kube-apiserver-client certificate controller: %w", err) } - go kubeAPIServerClientSigner.Run(ctx, 5) + + rx = append(rx, func(ctx context.Context) { + kubeAPIServerClientSigner.Run(ctx, 5) + }) } else { logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kube-apiserver-client") } @@ -101,14 +113,17 @@ func startCertificateSigningRequestSigningController(ctx context.Context, contro if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 { legacyUnknownSigner, err := signer.NewLegacyUnknownCSRSigningController(ctx, c, csrInformer, legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile, certTTL) if err != nil { - return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) + return nil, fmt.Errorf("failed to init kubernetes.io/legacy-unknown certificate controller: %w", err) } - go legacyUnknownSigner.Run(ctx, 5) + + rx = append(rx, func(ctx context.Context) { + legacyUnknownSigner.Run(ctx, 5) + }) } else { logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/legacy-unknown") } - return nil, true, nil + return newControllerLoop(concurrentRun(rx...), controllerName), nil } func areKubeletServingSignerFilesSpecified(config csrsigningconfig.CSRSigningControllerConfiguration) bool { @@ -171,49 +186,60 @@ func getLegacyUnknownSignerFiles(config csrsigningconfig.CSRSigningControllerCon func newCertificateSigningRequestApprovingControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.CertificateSigningRequestApprovingController, - aliases: []string{"csrapproving"}, - initFunc: startCertificateSigningRequestApprovingController, + name: names.CertificateSigningRequestApprovingController, + aliases: []string{"csrapproving"}, + constructor: newCertificateSigningRequestApprovingController, } } -func startCertificateSigningRequestApprovingController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - approver := approver.NewCSRApprovingController( +func newCertificateSigningRequestApprovingController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("certificate-controller") + if err != nil { + return nil, err + } + + ac := approver.NewCSRApprovingController( ctx, - controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), + client, controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go approver.Run(ctx, 5) - - return nil, true, nil + return newControllerLoop(func(ctx context.Context) { + ac.Run(ctx, 5) + }, controllerName), nil } func newCertificateSigningRequestCleanerControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.CertificateSigningRequestCleanerController, - aliases: []string{"csrcleaner"}, - initFunc: startCertificateSigningRequestCleanerController, + name: names.CertificateSigningRequestCleanerController, + aliases: []string{"csrcleaner"}, + constructor: newCertificateSigningRequestCleanerController, } } -func startCertificateSigningRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - cleaner := cleaner.NewCSRCleanerController( - controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), +func newCertificateSigningRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("certificate-controller") + if err != nil { + return nil, err + } + + cc := cleaner.NewCSRCleanerController( + client.CertificatesV1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go cleaner.Run(ctx, 1) - return nil, true, nil + return newControllerLoop(func(ctx context.Context) { + cc.Run(ctx, 1) + }, controllerName), nil } func newPodCertificateRequestCleanerControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PodCertificateRequestCleanerController, - initFunc: startPodCertificateRequestCleanerController, + name: names.PodCertificateRequestCleanerController, + constructor: newPodCertificateRequestCleanerController, requiredFeatureGates: []featuregate.Feature{ features.PodCertificateRequest, }, } } -func startPodCertificateRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newPodCertificateRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { cleaner := cleaner.NewPCRCleanerController( controllerContext.ClientBuilder.ClientOrDie("podcertificaterequestcleaner"), controllerContext.InformerFactory.Certificates().V1alpha1().PodCertificateRequests(), @@ -221,60 +247,69 @@ func startPodCertificateRequestCleanerController(ctx context.Context, controller 15*time.Minute, // We expect all PodCertificateRequest flows to complete faster than this. 5*time.Minute, ) - go cleaner.Run(ctx, 1) - return nil, true, nil + return newControllerLoop(func(ctx context.Context) { + cleaner.Run(ctx, 1) + }, controllerName), nil } func newRootCACertificatePublisherControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.RootCACertificatePublisherController, - aliases: []string{"root-ca-cert-publisher"}, - initFunc: startRootCACertificatePublisherController, + name: names.RootCACertificatePublisherController, + aliases: []string{"root-ca-cert-publisher"}, + constructor: newRootCACertificatePublisherController, } } -func startRootCACertificatePublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newRootCACertificatePublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { rootCA, err := getKubeAPIServerCAFileContents(controllerContext) if err != nil { - return nil, true, err + return nil, err + } + + client, err := controllerContext.NewClient("root-ca-cert-publisher") + if err != nil { + return nil, err } sac, err := rootcacertpublisher.NewPublisher( controllerContext.InformerFactory.Core().V1().ConfigMaps(), controllerContext.InformerFactory.Core().V1().Namespaces(), - controllerContext.ClientBuilder.ClientOrDie("root-ca-cert-publisher"), + client, rootCA, ) if err != nil { - return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) + return nil, fmt.Errorf("error creating root CA certificate publisher: %w", err) } - go sac.Run(ctx, 1) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + sac.Run(ctx, 1) + }, controllerName), nil } func newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ name: names.KubeAPIServerClusterTrustBundlePublisherController, - initFunc: newKubeAPIServerSignerClusterTrustBundledPublisherController, + constructor: newKubeAPIServerSignerClusterTrustBundledPublisherController, requiredFeatureGates: []featuregate.Feature{features.ClusterTrustBundle}, } } type controllerConstructor func(string, dynamiccertificates.CAContentProvider, kubernetes.Interface) (ctbpublisher.PublisherRunner, error) -func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newKubeAPIServerSignerClusterTrustBundledPublisherController( + ctx context.Context, controllerContext ControllerContext, controllerName string, +) (Controller, error) { rootCA, err := getKubeAPIServerCAFileContents(controllerContext) if err != nil { - return nil, false, err + return nil, err } - - if len(rootCA) == 0 || !utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundle) { - return nil, false, nil + if len(rootCA) == 0 { + return nil, nil } servingSigners, err := dynamiccertificates.NewStaticCAContent("kube-apiserver-serving", rootCA) if err != nil { - return nil, false, fmt.Errorf("failed to create a static CA content provider for the kube-apiserver-serving signer: %w", err) + return nil, fmt.Errorf("failed to create a static CA content provider for the kube-apiserver-serving signer: %w", err) } schemaControllerMapping := map[schema.GroupVersion]controllerConstructor{ @@ -282,12 +317,16 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Co certificatesv1beta1.SchemeGroupVersion: ctbpublisher.NewBetaClusterTrustBundlePublisher, } - apiserverSignerClient := controllerContext.ClientBuilder.ClientOrDie("kube-apiserver-serving-clustertrustbundle-publisher") + apiserverSignerClient, err := controllerContext.NewClient("kube-apiserver-serving-clustertrustbundle-publisher") + if err != nil { + return nil, err + } + var runner ctbpublisher.PublisherRunner for _, gv := range []schema.GroupVersion{certificatesv1beta1.SchemeGroupVersion, certificatesv1alpha1.SchemeGroupVersion} { ctbAvailable, err := clusterTrustBundlesAvailable(apiserverSignerClient, gv) if err != nil { - return nil, false, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err) + return nil, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err) } if !ctbAvailable { @@ -300,18 +339,17 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Co apiserverSignerClient, ) if err != nil { - return nil, false, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err) + return nil, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err) } break } if runner == nil { klog.Info("no known scheme version was found for clustertrustbundles, cannot start kube-apiserver-serving-clustertrustbundle-publisher-controller") - return nil, false, nil + return nil, nil } - go runner.Run(ctx) - return nil, true, nil + return newControllerLoop(runner.Run, controllerName), nil } func clusterTrustBundlesAvailable(client kubernetes.Interface, schemaVersion schema.GroupVersion) (bool, error) { @@ -334,7 +372,11 @@ func clusterTrustBundlesAvailable(client kubernetes.Interface, schemaVersion sch func getKubeAPIServerCAFileContents(controllerContext ControllerContext) ([]byte, error) { if controllerContext.ComponentConfig.SAController.RootCAFile == "" { - return controllerContext.ClientBuilder.ConfigOrDie("root-ca-cert-publisher").CAData, nil + config, err := controllerContext.NewClientConfig("root-ca-cert-publisher") + if err != nil { + return nil, err + } + return config.CAData, nil } rootCA, err := readCA(controllerContext.ComponentConfig.SAController.RootCAFile) diff --git a/cmd/kube-controller-manager/app/config/config.go b/cmd/kube-controller-manager/app/config/config.go index e841d109dd1..9031ba85030 100644 --- a/cmd/kube-controller-manager/app/config/config.go +++ b/cmd/kube-controller-manager/app/config/config.go @@ -24,6 +24,7 @@ import ( basecompatibility "k8s.io/component-base/compatibility" "k8s.io/component-base/zpages/flagz" kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config" + "time" ) // Config is the main context object for the controller manager. @@ -49,6 +50,8 @@ type Config struct { EventBroadcaster record.EventBroadcaster EventRecorder record.EventRecorder + ControllerShutdownTimeout time.Duration + // ComponentGlobalsRegistry is the registry where the effective versions and feature gates for all components are stored. ComponentGlobalsRegistry basecompatibility.ComponentGlobalsRegistry } diff --git a/cmd/kube-controller-manager/app/controller_descriptor.go b/cmd/kube-controller-manager/app/controller_descriptor.go new file mode 100644 index 00000000000..fa82992f817 --- /dev/null +++ b/cmd/kube-controller-manager/app/controller_descriptor.go @@ -0,0 +1,247 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + "sort" + + "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/component-base/featuregate" + "k8s.io/klog/v2" +) + +// This file contains types and functions for wrapping controller implementations from downstream packages. +// Every controller wrapper implements the Controller interface, +// which is then associated with a ControllerDescriptor, which holds additional static metadata +// needed so that the manager can manage Controllers properly. + +// Controller defines the base interface that all controller wrappers must implement. +type Controller interface { + // Name returns the controller's canonical name. + Name() string + + // Run runs the controller loop. + // When there is anything to be done, it blocks until the context is cancelled. + // Run must ensure all goroutines are terminated before returning. + Run(context.Context) +} + +// ControllerConstructor is a constructor for a controller. +// A nil Controller returned means that the associated controller is disabled. +type ControllerConstructor func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) + +type ControllerDescriptor struct { + name string + constructor ControllerConstructor + requiredFeatureGates []featuregate.Feature + aliases []string + isDisabledByDefault bool + isCloudProviderController bool + requiresSpecialHandling bool +} + +func (r *ControllerDescriptor) Name() string { + return r.name +} + +func (r *ControllerDescriptor) GetControllerConstructor() ControllerConstructor { + return r.constructor +} + +func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature { + return append([]featuregate.Feature(nil), r.requiredFeatureGates...) +} + +// GetAliases returns aliases to ensure backwards compatibility and should never be removed! +// Only addition of new aliases is allowed, and only when a canonical name is changed (please see CHANGE POLICY of controller names) +func (r *ControllerDescriptor) GetAliases() []string { + return append([]string(nil), r.aliases...) +} + +func (r *ControllerDescriptor) IsDisabledByDefault() bool { + return r.isDisabledByDefault +} + +func (r *ControllerDescriptor) IsCloudProviderController() bool { + return r.isCloudProviderController +} + +// RequiresSpecialHandling should return true only in a special non-generic controllers like ServiceAccountTokenController +func (r *ControllerDescriptor) RequiresSpecialHandling() bool { + return r.requiresSpecialHandling +} + +// BuildController creates a controller based on the given descriptor. +// The associated controller's constructor is called at the end, so the same contract applies for the return values here. +func (r *ControllerDescriptor) BuildController(ctx context.Context, controllerCtx ControllerContext) (Controller, error) { + logger := klog.FromContext(ctx) + controllerName := r.Name() + + for _, featureGate := range r.GetRequiredFeatureGates() { + if !utilfeature.DefaultFeatureGate.Enabled(featureGate) { + logger.Info("Controller is disabled by a feature gate", + "controller", controllerName, + "requiredFeatureGates", r.GetRequiredFeatureGates()) + return nil, nil + } + } + + if r.IsCloudProviderController() { + logger.Info("Skipping a cloud provider controller", "controller", controllerName) + return nil, nil + } + + ctx = klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)) + return r.GetControllerConstructor()(ctx, controllerCtx, controllerName) +} + +// KnownControllers returns all known controllers' name +func KnownControllers() []string { + return sets.StringKeySet(NewControllerDescriptors()).List() +} + +// ControllerAliases returns a mapping of aliases to canonical controller names +func ControllerAliases() map[string]string { + aliases := map[string]string{} + for name, c := range NewControllerDescriptors() { + for _, alias := range c.GetAliases() { + aliases[alias] = name + } + } + return aliases +} + +func ControllersDisabledByDefault() []string { + var controllersDisabledByDefault []string + + for name, c := range NewControllerDescriptors() { + if c.IsDisabledByDefault() { + controllersDisabledByDefault = append(controllersDisabledByDefault, name) + } + } + + sort.Strings(controllersDisabledByDefault) + + return controllersDisabledByDefault +} + +// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func) +// paired to their ControllerDescriptor wrapper object that includes the associated controller constructor. +// This allows for structured downstream composition and subdivision. +func NewControllerDescriptors() map[string]*ControllerDescriptor { + controllers := map[string]*ControllerDescriptor{} + aliases := sets.NewString() + + // All the controllers must fulfil common constraints, or else we will explode. + register := func(controllerDesc *ControllerDescriptor) { + if controllerDesc == nil { + panic("received nil controller for a registration") + } + name := controllerDesc.Name() + if len(name) == 0 { + panic("received controller without a name for a registration") + } + if _, found := controllers[name]; found { + panic(fmt.Sprintf("controller name %q was registered twice", name)) + } + if controllerDesc.GetControllerConstructor() == nil { + panic(fmt.Sprintf("controller %q does not have a constructor specified", name)) + } + + for _, alias := range controllerDesc.GetAliases() { + if aliases.Has(alias) { + panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias)) + } + aliases.Insert(alias) + } + + controllers[name] = controllerDesc + } + + // First add "special" controllers that aren't initialized normally. These controllers cannot be initialized + // in the main controller loop initialization, so we add them here only for the metadata and duplication detection. + // app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers + // The only known special case is the ServiceAccountTokenController which *must* be started + // first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new + // special controllers. + register(newServiceAccountTokenControllerDescriptor(nil)) + + register(newEndpointsControllerDescriptor()) + register(newEndpointSliceControllerDescriptor()) + register(newEndpointSliceMirroringControllerDescriptor()) + register(newReplicationControllerDescriptor()) + register(newPodGarbageCollectorControllerDescriptor()) + register(newResourceQuotaControllerDescriptor()) + register(newNamespaceControllerDescriptor()) + register(newServiceAccountControllerDescriptor()) + register(newGarbageCollectorControllerDescriptor()) + register(newDaemonSetControllerDescriptor()) + register(newJobControllerDescriptor()) + register(newDeploymentControllerDescriptor()) + register(newReplicaSetControllerDescriptor()) + register(newHorizontalPodAutoscalerControllerDescriptor()) + register(newDisruptionControllerDescriptor()) + register(newStatefulSetControllerDescriptor()) + register(newCronJobControllerDescriptor()) + register(newCertificateSigningRequestSigningControllerDescriptor()) + register(newCertificateSigningRequestApprovingControllerDescriptor()) + register(newCertificateSigningRequestCleanerControllerDescriptor()) + register(newPodCertificateRequestCleanerControllerDescriptor()) + register(newTTLControllerDescriptor()) + register(newBootstrapSignerControllerDescriptor()) + register(newTokenCleanerControllerDescriptor()) + register(newNodeIpamControllerDescriptor()) + register(newNodeLifecycleControllerDescriptor()) + + register(newServiceLBControllerDescriptor()) // cloud provider controller + register(newNodeRouteControllerDescriptor()) // cloud provider controller + register(newCloudNodeLifecycleControllerDescriptor()) // cloud provider controller + + register(newPersistentVolumeBinderControllerDescriptor()) + register(newPersistentVolumeAttachDetachControllerDescriptor()) + register(newPersistentVolumeExpanderControllerDescriptor()) + register(newClusterRoleAggregrationControllerDescriptor()) + register(newPersistentVolumeClaimProtectionControllerDescriptor()) + register(newPersistentVolumeProtectionControllerDescriptor()) + register(newVolumeAttributesClassProtectionControllerDescriptor()) + register(newTTLAfterFinishedControllerDescriptor()) + register(newRootCACertificatePublisherControllerDescriptor()) + register(newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor()) + register(newEphemeralVolumeControllerDescriptor()) + + // feature gated + register(newStorageVersionGarbageCollectorControllerDescriptor()) + register(newResourceClaimControllerDescriptor()) + register(newDeviceTaintEvictionControllerDescriptor()) + register(newLegacyServiceAccountTokenCleanerControllerDescriptor()) + register(newValidatingAdmissionPolicyStatusControllerDescriptor()) + register(newTaintEvictionControllerDescriptor()) + register(newServiceCIDRsControllerDescriptor()) + register(newStorageVersionMigratorControllerDescriptor()) + register(newSELinuxWarningControllerDescriptor()) + + for _, alias := range aliases.UnsortedList() { + if _, ok := controllers[alias]; ok { + panic(fmt.Sprintf("alias %q conflicts with a controller name", alias)) + } + } + + return controllers +} diff --git a/cmd/kube-controller-manager/app/controller_utils.go b/cmd/kube-controller-manager/app/controller_utils.go new file mode 100644 index 00000000000..8a9ca1baa0e --- /dev/null +++ b/cmd/kube-controller-manager/app/controller_utils.go @@ -0,0 +1,67 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "sync" +) + +// This file contains utility functions for implementing controller wrappers, +// i.e. turning whatever logic into a Controller. + +type runFunc func(ctx context.Context) + +type runFuncSlice []runFunc + +func (rx runFuncSlice) Run(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(len(rx)) + for _, fnc := range rx { + go func() { + defer wg.Done() + fnc(ctx) + }() + } + wg.Wait() +} + +// concurrentRun returns a runFunc that wraps the given functions to run concurrently. +func concurrentRun(rx ...runFunc) runFunc { + return runFuncSlice(rx).Run +} + +// controllerLoop implements the Controller interface. It makes it easy to turn a function into a Controller. +type controllerLoop struct { + name string + run runFunc +} + +func newControllerLoop(run runFunc, controllerName string) *controllerLoop { + return &controllerLoop{ + name: controllerName, + run: run, + } +} + +func (loop *controllerLoop) Name() string { + return loop.name +} + +func (loop *controllerLoop) Run(ctx context.Context) { + loop.run(ctx) +} diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 4bca8b0278d..572a34e868b 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -25,7 +25,7 @@ import ( "math/rand" "net/http" "os" - "sort" + "sync" "time" "github.com/blang/semver/v4" @@ -43,6 +43,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" @@ -51,7 +52,6 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" certutil "k8s.io/client-go/util/cert" - "k8s.io/client-go/util/keyutil" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" basecompatibility "k8s.io/component-base/compatibility" @@ -80,9 +80,7 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/names" kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config" garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector" - serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" kubefeatures "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/serviceaccount" ) func init() { @@ -253,16 +251,27 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if err := StartControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler); err != nil { - logger.Error(err, "Error starting controllers") + // Prepare all controllers in advance. + controllers, err := BuildControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler) + if err != nil { + logger.Error(err, "Error building controllers") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } + // Start the informers. + stopCh := ctx.Done() controllerContext.InformerFactory.Start(stopCh) controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh) close(controllerContext.InformersStarted) - <-ctx.Done() + // Actually start the controllers. + if len(controllers) > 0 { + if !RunControllers(ctx, controllerContext, controllers, ControllerStartJitter, c.ControllerShutdownTimeout) { + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } else { + <-ctx.Done() + } } // No leader election, run directly @@ -291,14 +300,23 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration, kubeControllerManager) - // startSATokenControllerInit is the original InitFunc. - startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc() + // startSATokenControllerInit is the original constructor. + saTokenControllerInit := saTokenControllerDescriptor.GetControllerConstructor() - // Wrap saTokenControllerDescriptor to signal readiness for migration after starting - // the controller. - saTokenControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - defer close(leaderMigrator.MigrationReady) - return startSATokenControllerInit(ctx, controllerContext, controllerName) + // Wrap saTokenControllerDescriptor to signal readiness for migration after starting the controller. + saTokenControllerDescriptor.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + ctrl, err := saTokenControllerInit(ctx, controllerContext, controllerName) + if err != nil { + return nil, err + } + + // This wrapping is not exactly flawless as RunControllers uses type casting, + // which is now not possible for the wrapped controller. + // This fortunately doesn't matter for this particular controller. + return newControllerLoop(func(ctx context.Context) { + close(leaderMigrator.MigrationReady) + ctrl.Run(ctx) + }, controllerName), nil } } @@ -431,188 +449,22 @@ func (c ControllerContext) IsControllerEnabled(controllerDescriptor *ControllerD return genericcontrollermanager.IsControllerEnabled(controllerDescriptor.Name(), controllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) } -// InitFunc is used to launch a particular controller. It returns a controller -// that can optionally implement other interfaces so that the controller manager -// can support the requested features. -// The returned controller may be nil, which will be considered an anonymous controller -// that requests no additional features from the controller manager. -// Any error returned will cause the controller process to `Fatal` -// The bool indicates whether the controller was enabled. -type InitFunc func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controller.Interface, enabled bool, err error) - -type ControllerDescriptor struct { - name string - initFunc InitFunc - requiredFeatureGates []featuregate.Feature - aliases []string - isDisabledByDefault bool - isCloudProviderController bool - requiresSpecialHandling bool -} - -func (r *ControllerDescriptor) Name() string { - return r.name -} - -func (r *ControllerDescriptor) GetInitFunc() InitFunc { - return r.initFunc -} - -func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature { - return append([]featuregate.Feature(nil), r.requiredFeatureGates...) -} - -// GetAliases returns aliases to ensure backwards compatibility and should never be removed! -// Only addition of new aliases is allowed, and only when a canonical name is changed (please see CHANGE POLICY of controller names) -func (r *ControllerDescriptor) GetAliases() []string { - return append([]string(nil), r.aliases...) -} - -func (r *ControllerDescriptor) IsDisabledByDefault() bool { - return r.isDisabledByDefault -} - -func (r *ControllerDescriptor) IsCloudProviderController() bool { - return r.isCloudProviderController -} - -// RequiresSpecialHandling should return true only in a special non-generic controllers like ServiceAccountTokenController -func (r *ControllerDescriptor) RequiresSpecialHandling() bool { - return r.requiresSpecialHandling -} - -// KnownControllers returns all known controllers's name -func KnownControllers() []string { - return sets.StringKeySet(NewControllerDescriptors()).List() -} - -// ControllerAliases returns a mapping of aliases to canonical controller names -func ControllerAliases() map[string]string { - aliases := map[string]string{} - for name, c := range NewControllerDescriptors() { - for _, alias := range c.GetAliases() { - aliases[alias] = name - } +// NewClientConfig is a shortcut for ClientBuilder.Config. It wraps the error with an additional message. +func (c ControllerContext) NewClientConfig(name string) (*restclient.Config, error) { + config, err := c.ClientBuilder.Config(name) + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client config for %q: %w", name, err) } - return aliases + return config, nil } -func ControllersDisabledByDefault() []string { - var controllersDisabledByDefault []string - - for name, c := range NewControllerDescriptors() { - if c.IsDisabledByDefault() { - controllersDisabledByDefault = append(controllersDisabledByDefault, name) - } +// NewClient is a shortcut for ClientBuilder.Client. It wraps the error with an additional message. +func (c ControllerContext) NewClient(name string) (kubernetes.Interface, error) { + client, err := c.ClientBuilder.Client(name) + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client for %q: %w", name, err) } - - sort.Strings(controllersDisabledByDefault) - - return controllersDisabledByDefault -} - -// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func) -// paired to their ControllerDescriptor wrapper object that includes InitFunc. -// This allows for structured downstream composition and subdivision. -func NewControllerDescriptors() map[string]*ControllerDescriptor { - controllers := map[string]*ControllerDescriptor{} - aliases := sets.NewString() - - // All the controllers must fulfil common constraints, or else we will explode. - register := func(controllerDesc *ControllerDescriptor) { - if controllerDesc == nil { - panic("received nil controller for a registration") - } - name := controllerDesc.Name() - if len(name) == 0 { - panic("received controller without a name for a registration") - } - if _, found := controllers[name]; found { - panic(fmt.Sprintf("controller name %q was registered twice", name)) - } - if controllerDesc.GetInitFunc() == nil { - panic(fmt.Sprintf("controller %q does not have an init function", name)) - } - - for _, alias := range controllerDesc.GetAliases() { - if aliases.Has(alias) { - panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias)) - } - aliases.Insert(alias) - } - - controllers[name] = controllerDesc - } - - // First add "special" controllers that aren't initialized normally. These controllers cannot be initialized - // in the main controller loop initialization, so we add them here only for the metadata and duplication detection. - // app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers - // The only known special case is the ServiceAccountTokenController which *must* be started - // first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new - // special controllers. - register(newServiceAccountTokenControllerDescriptor(nil)) - - register(newEndpointsControllerDescriptor()) - register(newEndpointSliceControllerDescriptor()) - register(newEndpointSliceMirroringControllerDescriptor()) - register(newReplicationControllerDescriptor()) - register(newPodGarbageCollectorControllerDescriptor()) - register(newResourceQuotaControllerDescriptor()) - register(newNamespaceControllerDescriptor()) - register(newServiceAccountControllerDescriptor()) - register(newGarbageCollectorControllerDescriptor()) - register(newDaemonSetControllerDescriptor()) - register(newJobControllerDescriptor()) - register(newDeploymentControllerDescriptor()) - register(newReplicaSetControllerDescriptor()) - register(newHorizontalPodAutoscalerControllerDescriptor()) - register(newDisruptionControllerDescriptor()) - register(newStatefulSetControllerDescriptor()) - register(newCronJobControllerDescriptor()) - register(newCertificateSigningRequestSigningControllerDescriptor()) - register(newCertificateSigningRequestApprovingControllerDescriptor()) - register(newCertificateSigningRequestCleanerControllerDescriptor()) - register(newPodCertificateRequestCleanerControllerDescriptor()) - register(newTTLControllerDescriptor()) - register(newBootstrapSignerControllerDescriptor()) - register(newTokenCleanerControllerDescriptor()) - register(newNodeIpamControllerDescriptor()) - register(newNodeLifecycleControllerDescriptor()) - - register(newServiceLBControllerDescriptor()) // cloud provider controller - register(newNodeRouteControllerDescriptor()) // cloud provider controller - register(newCloudNodeLifecycleControllerDescriptor()) // cloud provider controller - - register(newPersistentVolumeBinderControllerDescriptor()) - register(newPersistentVolumeAttachDetachControllerDescriptor()) - register(newPersistentVolumeExpanderControllerDescriptor()) - register(newClusterRoleAggregrationControllerDescriptor()) - register(newPersistentVolumeClaimProtectionControllerDescriptor()) - register(newPersistentVolumeProtectionControllerDescriptor()) - register(newVolumeAttributesClassProtectionControllerDescriptor()) - register(newTTLAfterFinishedControllerDescriptor()) - register(newRootCACertificatePublisherControllerDescriptor()) - register(newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor()) - register(newEphemeralVolumeControllerDescriptor()) - - // feature gated - register(newStorageVersionGarbageCollectorControllerDescriptor()) - register(newResourceClaimControllerDescriptor()) - register(newDeviceTaintEvictionControllerDescriptor()) - register(newLegacyServiceAccountTokenCleanerControllerDescriptor()) - register(newValidatingAdmissionPolicyStatusControllerDescriptor()) - register(newTaintEvictionControllerDescriptor()) - register(newServiceCIDRsControllerDescriptor()) - register(newStorageVersionMigratorControllerDescriptor()) - register(newSELinuxWarningControllerDescriptor()) - - for _, alias := range aliases.UnsortedList() { - if _, ok := controllers[alias]; ok { - panic(fmt.Sprintf("alias %q conflicts with a controller name", alias)) - } - } - - return controllers + return client, nil } // CreateControllerContext creates a context struct containing references to resources needed by the @@ -629,20 +481,37 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo return obj, nil } - versionedClient := rootClientBuilder.ClientOrDie("shared-informers") + versionedClient, err := rootClientBuilder.Client("shared-informers") + if err != nil { + return ControllerContext{}, fmt.Errorf("failed to create Kubernetes client for %q: %w", "shared-informers", err) + } + sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim)) - metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers")) + metadataConfig, err := rootClientBuilder.Config("metadata-informers") + if err != nil { + return ControllerContext{}, fmt.Errorf("failed to create metadata client config: %w", err) + } + + metadataClient, err := metadata.NewForConfig(metadataConfig) + if err != nil { + return ControllerContext{}, fmt.Errorf("failed to create metadata client: %w", err) + } + metadataInformers := metadatainformer.NewSharedInformerFactoryWithOptions(metadataClient, ResyncPeriod(s)(), metadatainformer.WithTransform(trim)) // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil { - return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err) + return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %w", err) } // Use a discovery client capable of being refreshed. - discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery") + discoveryClient, err := rootClientBuilder.DiscoveryClient("controller-discovery") + if err != nil { + return ControllerContext{}, fmt.Errorf("failed to create discovery client: %w", err) + } + cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient) restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient) go wait.Until(func() { @@ -681,95 +550,38 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo return controllerContext, nil } -// StartControllers starts a set of controllers with a specified ControllerContext -func StartControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor, - unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error { - var controllerChecks []healthz.HealthChecker - - // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest - // If this fails, just return here and fail since other controllers won't be able to get credentials. - if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok { - check, err := StartController(ctx, controllerCtx, serviceAccountTokenControllerDescriptor, unsecuredMux) - if err != nil { - return err - } - if check != nil { - // HealthChecker should be present when controller has started - controllerChecks = append(controllerChecks, check) - } - } - - // Each controller is passed a context where the logger has the name of - // the controller set through WithName. That name then becomes the prefix of - // of all log messages emitted by that controller. - // - // In StartController, an explicit "controller" key is used instead, for two reasons: - // - while contextual logging is alpha, klog.LoggerWithName is still a no-op, - // so we cannot rely on it yet to add the name - // - it allows distinguishing between log entries emitted by the controller - // and those emitted for it - this is a bit debatable and could be revised. - for _, controllerDesc := range controllerDescriptors { - if controllerDesc.RequiresSpecialHandling() { - continue - } - - check, err := StartController(ctx, controllerCtx, controllerDesc, unsecuredMux) - if err != nil { - return err - } - if check != nil { - // HealthChecker should be present when controller has started - controllerChecks = append(controllerChecks, check) - } - } - - healthzHandler.AddHealthChecker(controllerChecks...) - - return nil +// HealthCheckAdder is an interface to represent a healthz handler. +// The extra level of indirection is useful for testing. +type HealthCheckAdder interface { + AddHealthChecker(checks ...healthz.HealthChecker) } -// StartController starts a controller with a specified ControllerContext -// and performs required pre- and post- checks/actions -func StartController(ctx context.Context, controllerCtx ControllerContext, controllerDescriptor *ControllerDescriptor, - unsecuredMux *mux.PathRecorderMux) (healthz.HealthChecker, error) { +// BuildControllers builds all controllers in the given descriptor map. Disabled controllers are obviously skipped. +// +// A health check is registered for each controller using the controller name. The default check always passes. +// If the controller implements controller.HealthCheckable, though, the given check is used. +// The controller can also implement controller.Debuggable, in which case the debug handler is registered with the given mux. +func BuildControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor, + unsecuredMux *mux.PathRecorderMux, healthzHandler HealthCheckAdder) ([]Controller, error) { logger := klog.FromContext(ctx) - controllerName := controllerDescriptor.Name() - - for _, featureGate := range controllerDescriptor.GetRequiredFeatureGates() { - if !utilfeature.DefaultFeatureGate.Enabled(featureGate) { - logger.Info("Controller is disabled by a feature gate", "controller", controllerName, "requiredFeatureGates", controllerDescriptor.GetRequiredFeatureGates()) - return nil, nil + var ( + controllers []Controller + checks []healthz.HealthChecker + ) + buildController := func(controllerDesc *ControllerDescriptor) error { + controllerName := controllerDesc.Name() + ctrl, err := controllerDesc.BuildController(ctx, controllerCtx) + if err != nil { + logger.Error(err, "Error initializing a controller", "controller", controllerName) + return err + } + if ctrl == nil { + logger.Info("Warning: skipping controller", "controller", controllerName) + return nil } - } - if controllerDescriptor.IsCloudProviderController() { - logger.Info("Skipping a cloud provider controller", "controller", controllerName) - return nil, nil - } - - if !controllerCtx.IsControllerEnabled(controllerDescriptor) { - logger.Info("Warning: controller is disabled", "controller", controllerName) - return nil, nil - } - - time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) - - logger.V(1).Info("Starting controller", "controller", controllerName) - - initFunc := controllerDescriptor.GetInitFunc() - ctrl, started, err := initFunc(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx, controllerName) - if err != nil { - logger.Error(err, "Error starting controller", "controller", controllerName) - return nil, err - } - if !started { - logger.Info("Warning: skipping controller", "controller", controllerName) - return nil, nil - } - - check := controllerhealthz.NamedPingChecker(controllerName) - if ctrl != nil { - // check if the controller supports and requests a debugHandler + check := controllerhealthz.NamedPingChecker(controllerName) + // check if the controller supports and requests a debugHandler, // and it needs the unsecuredMux to mount the handler onto. if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil { if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil { @@ -783,69 +595,152 @@ func StartController(ctx context.Context, controllerCtx ControllerContext, contr check = controllerhealthz.NamedHealthChecker(controllerName, realCheck) } } + + controllers = append(controllers, ctrl) + checks = append(checks, check) + return nil } - logger.Info("Started controller", "controller", controllerName) - return check, nil -} - -// serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers. -// It cannot use the "normal" client builder, so it tracks its own. -func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor { - return &ControllerDescriptor{ - name: names.ServiceAccountTokenController, - aliases: []string{"serviceaccount-token"}, - initFunc: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - return startServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder) - }, - // will make sure it runs first before other controllers - requiresSpecialHandling: true, - } -} - -func startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext, controllerName string, rootClientBuilder clientbuilder.ControllerClientBuilder) (controller.Interface, bool, error) { - logger := klog.FromContext(ctx) - if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 { - logger.Info("Controller is disabled because there is no private key", "controller", controllerName) - return nil, false, nil - } - privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) - if err != nil { - return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err) - } - - var rootCA []byte - if controllerContext.ComponentConfig.SAController.RootCAFile != "" { - if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil { - return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err) + // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest + // If this fails, just return here and fail since other controllers won't be able to get credentials. + if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok { + if err := buildController(serviceAccountTokenControllerDescriptor); err != nil { + return nil, err } - } else { - rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData } - tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey) - if err != nil { - return nil, false, fmt.Errorf("failed to build token generator: %v", err) - } - tokenController, err := serviceaccountcontroller.NewTokensController( - logger, - controllerContext.InformerFactory.Core().V1().ServiceAccounts(), - controllerContext.InformerFactory.Core().V1().Secrets(), - rootClientBuilder.ClientOrDie("tokens-controller"), - serviceaccountcontroller.TokensControllerOptions{ - TokenGenerator: tokenGenerator, - RootCA: rootCA, - }, - ) - if err != nil { - return nil, true, fmt.Errorf("error creating Tokens controller: %v", err) - } - go tokenController.Run(ctx, int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs)) + // Each controller is passed a context where the logger has the name of + // the controller set through WithName. That name then becomes the prefix of + // all log messages emitted by that controller. + // + // In StartController, an explicit "controller" key is used instead, for two reasons: + // - while contextual logging is alpha, klog.LoggerWithName is still a no-op, + // so we cannot rely on it yet to add the name + // - it allows distinguishing between log entries emitted by the controller + // and those emitted for it - this is a bit debatable and could be revised. + for _, controllerDesc := range controllerDescriptors { + if controllerDesc.RequiresSpecialHandling() { + continue + } - // start the first set of informers now so that other controllers can start - controllerContext.InformerFactory.Start(ctx.Done()) + if !controllerCtx.IsControllerEnabled(controllerDesc) { + logger.Info("Warning: controller is disabled", "controller", controllerDesc.Name()) + continue + } - return nil, true, nil + if err := buildController(controllerDesc); err != nil { + return nil, err + } + } + + // Register the checks. + if len(checks) > 0 { + healthzHandler.AddHealthChecker(checks...) + } + return controllers, nil +} + +// RunControllers runs all controllers concurrently and blocks until the context is cancelled and all controllers are terminated. +// +// Once the context is cancelled, RunControllers waits for shutdownTimeout for all controllers to terminate. +// When the timeout is reached, the function unblocks and returns false. +// Zero shutdown timeout means that there is no timeout. +func RunControllers(ctx context.Context, controllerCtx ControllerContext, controllers []Controller, + controllerStartJitterMaxFactor float64, shutdownTimeout time.Duration) bool { + logger := klog.FromContext(ctx) + + // We gather running controllers names for logging purposes. + // When the context is cancelled, the controllers still running are logged periodically. + runningControllers := sets.New[string]() + var runningControllersLock sync.Mutex + + loggingCtx, cancelLoggingCtx := context.WithCancel(context.Background()) + defer cancelLoggingCtx() + go func() { + // Only start logging when terminating. + select { + case <-ctx.Done(): + case <-loggingCtx.Done(): + return + } + + // Regularly print the controllers that still haven't returned. + logPeriod := shutdownTimeout / 3 + if logPeriod == 0 { + logPeriod = 5 * time.Second + } + ticker := time.NewTicker(logPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.C: + runningControllersLock.Lock() + running := sets.List(runningControllers) + runningControllersLock.Unlock() + + logger.Info("Still waiting for some controllers to terminate...", "runningControllers", running) + + case <-loggingCtx.Done(): + return + } + } + }() + + terminatedCh := make(chan struct{}) + go func() { + defer close(terminatedCh) + var wg sync.WaitGroup + wg.Add(len(controllers)) + for _, controller := range controllers { + go func() { + defer wg.Done() + + // It would be better to unblock and return on context cancelled here, + // but that makes tests more flaky regarding timing. + time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, controllerStartJitterMaxFactor)) + + logger.V(1).Info("Controller starting...", "controller", controller.Name()) + + runningControllersLock.Lock() + runningControllers.Insert(controller.Name()) + runningControllersLock.Unlock() + + defer func() { + logger.V(1).Info("Controller terminated", "controller", controller.Name()) + + runningControllersLock.Lock() + runningControllers.Delete(controller.Name()) + runningControllersLock.Unlock() + }() + controller.Run(ctx) + }() + } + wg.Wait() + logger.Info("All controllers terminated") + }() + + // Wait for a signal to terminate. + select { + case <-ctx.Done(): + case <-terminatedCh: + return true + } + + // Wait for the shutdown timeout. + var shutdownCh <-chan time.Time + if shutdownTimeout > 0 { + shutdownCh = time.After(shutdownTimeout) + } + select { + case <-terminatedCh: + return true + case <-shutdownCh: + runningControllersLock.Lock() + running := sets.List(runningControllers) + runningControllersLock.Unlock() + logger.Info("Controller shutdown timeout reached", "timeout", shutdownTimeout, "runningControllers", running) + return false + } } func readCA(file string) ([]byte, error) { diff --git a/cmd/kube-controller-manager/app/controllermanager_test.go b/cmd/kube-controller-manager/app/controllermanager_test.go index ae5844049b0..67a03e21946 100644 --- a/cmd/kube-controller-manager/app/controllermanager_test.go +++ b/cmd/kube-controller-manager/app/controllermanager_test.go @@ -21,16 +21,17 @@ import ( "regexp" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apiserver/pkg/server/healthz" utilfeature "k8s.io/apiserver/pkg/util/feature" cpnames "k8s.io/cloud-provider/names" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" - controllermanagercontroller "k8s.io/controller-manager/controller" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/features" @@ -206,19 +207,21 @@ func TestTaintEvictionControllerGating(t *testing.T) { initFuncCalled := false taintEvictionControllerDescriptor := NewControllerDescriptors()[names.TaintEvictionController] - taintEvictionControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controllermanagercontroller.Interface, enabled bool, err error) { + taintEvictionControllerDescriptor.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { initFuncCalled = true - return nil, true, nil + return newControllerLoop(func(ctx context.Context) {}, controllerName), nil } - healthCheck, err := StartController(ctx, controllerCtx, taintEvictionControllerDescriptor, nil) - if err != nil { + var healthChecks mockHealthCheckAdder + if err := runControllers(ctx, controllerCtx, map[string]*ControllerDescriptor{ + names.TaintEvictionController: taintEvictionControllerDescriptor, + }, &healthChecks); err != nil { t.Errorf("starting a TaintEvictionController controller should not return an error") } if test.expectInitFuncCall != initFuncCalled { t.Errorf("TaintEvictionController init call check failed: expected=%v, got=%v", test.expectInitFuncCall, initFuncCalled) } - hasHealthCheck := healthCheck != nil + hasHealthCheck := len(healthChecks.Checks) > 0 expectHealthCheck := test.expectInitFuncCall if expectHealthCheck != hasHealthCheck { t.Errorf("TaintEvictionController healthCheck check failed: expected=%v, got=%v", expectHealthCheck, hasHealthCheck) @@ -229,23 +232,96 @@ func TestTaintEvictionControllerGating(t *testing.T) { func TestNoCloudProviderControllerStarted(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - controllerCtx := ControllerContext{} controllerCtx.ComponentConfig.Generic.Controllers = []string{"*"} - for _, controller := range NewControllerDescriptors() { + cpControllerDescriptors := make(map[string]*ControllerDescriptor) + for controllerName, controller := range NewControllerDescriptors() { if !controller.IsCloudProviderController() { continue } - controllerName := controller.Name() - checker, err := StartController(ctx, controllerCtx, controller, nil) - if err != nil { - t.Errorf("Error starting controller %q: %v", controllerName, err) - } - if checker != nil { - t.Errorf("Controller %q should not be started", controllerName) + controller.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + return newControllerLoop(func(ctx context.Context) { + t.Error("Controller should not be started:", controllerName) + }, controllerName), nil } + + cpControllerDescriptors[controllerName] = controller + } + + var healthChecks mockHealthCheckAdder + if err := runControllers(ctx, controllerCtx, cpControllerDescriptors, &healthChecks); err != nil { + t.Error("Failed to start controllers:", err) } } + +func TestRunControllers(t *testing.T) { + testCases := []struct { + name string + newController func(ctx context.Context) Controller + shutdownTimeout time.Duration + expectedCleanTermination bool + }{ + { + name: "clean shutdown", + newController: func(testCtx context.Context) Controller { + return newControllerLoop(func(ctx context.Context) { + <-ctx.Done() + }, "controller-A") + }, + shutdownTimeout: 10 * time.Second, + expectedCleanTermination: true, + }, + { + name: "shutdown timeout", + newController: func(testCtx context.Context) Controller { + return newControllerLoop(func(ctx context.Context) { + <-testCtx.Done() + }, "controller-A") + }, + shutdownTimeout: 50 * time.Millisecond, + expectedCleanTermination: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + controllerCtx := ControllerContext{} + + // testCtx is used to make sure the controller failing to shut down can exit after the test is finished. + testCtx, cancelTest := context.WithCancel(ctx) + defer cancelTest() + + // ctx is used to wait in the controller for shutdown and also to start the shutdown timeout in RunControllers. + // To start the shutdown timeout immediately, we start with a cancelled context already. + ctx, cancelController := context.WithCancel(ctx) + cancelController() + + cleanShutdown := RunControllers(ctx, controllerCtx, []Controller{tc.newController(testCtx)}, 0, tc.shutdownTimeout) + if cleanShutdown != tc.expectedCleanTermination { + t.Errorf("expected clean shutdown %v, got %v", tc.expectedCleanTermination, cleanShutdown) + } + }) + } +} + +type mockHealthCheckAdder struct { + Checks []healthz.HealthChecker +} + +func (m *mockHealthCheckAdder) AddHealthChecker(checks ...healthz.HealthChecker) { + m.Checks = append(m.Checks, checks...) +} + +func runControllers( + ctx context.Context, controllerCtx ControllerContext, + controllerDescriptors map[string]*ControllerDescriptor, healthzChecks HealthCheckAdder, +) error { + controllers, err := BuildControllers(ctx, controllerCtx, controllerDescriptors, nil, healthzChecks) + if err != nil { + return err + } + RunControllers(ctx, controllerCtx, controllers, 0, 0) + return nil +} diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 8cf5465578a..dfef26d5902 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -31,6 +31,7 @@ import ( genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/quota/v1/generic" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/discovery" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/metadata" restclient "k8s.io/client-go/rest" @@ -82,47 +83,43 @@ const ( func newServiceLBControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: cpnames.ServiceLBController, - aliases: []string{"service"}, - initFunc: startServiceLBController, + name: cpnames.ServiceLBController, + aliases: []string{"service"}, + constructor: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + logger := klog.FromContext(ctx) + logger.Info("Warning: service-controller is set, but no cloud provider functionality is available in kube-controller-manger (KEP-2395). Will not configure service controller.") + return nil, nil + }, isCloudProviderController: true, } } -func startServiceLBController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - logger := klog.FromContext(ctx) - logger.Info("Warning: service-controller is set, but no cloud provider functionality is available in kube-controller-manger (KEP-2395). Will not configure service controller.") - return nil, false, nil -} func newNodeIpamControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.NodeIpamController, - aliases: []string{"nodeipam"}, - initFunc: startNodeIpamController, + name: names.NodeIpamController, + aliases: []string{"nodeipam"}, + constructor: newNodeIpamController, } } -func startNodeIpamController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - var serviceCIDR *net.IPNet - var secondaryServiceCIDR *net.IPNet - logger := klog.FromContext(ctx) - - // should we start nodeIPAM +func newNodeIpamController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs { - return nil, false, nil + return nil, nil } - if controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType == string(ipam.CloudAllocatorType) { // Cannot run cloud ipam controller if cloud provider is nil (--cloud-provider not set or set to 'external') - return nil, false, errors.New("--cidr-allocator-type is set to 'CloudAllocator' but cloud provider is not configured") + return nil, errors.New("--cidr-allocator-type is set to 'CloudAllocator' but cloud provider is not configured") } clusterCIDRs, err := validateCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR) if err != nil { - return nil, false, err + return nil, err } // service cidr processing + var serviceCIDR *net.IPNet + var secondaryServiceCIDR *net.IPNet + logger := klog.FromContext(ctx) if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 { _, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR) if err != nil { @@ -142,10 +139,10 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo // should be dual stack (from different IPFamilies) dualstackServiceCIDR, err := netutils.IsDualStackCIDRs([]*net.IPNet{serviceCIDR, secondaryServiceCIDR}) if err != nil { - return nil, false, fmt.Errorf("failed to perform dualstack check on serviceCIDR and secondaryServiceCIDR error: %w", err) + return nil, fmt.Errorf("failed to perform dualstack check on serviceCIDR and secondaryServiceCIDR error: %w", err) } if !dualstackServiceCIDR { - return nil, false, fmt.Errorf("serviceCIDR and secondaryServiceCIDR are not dualstack (from different IPfamiles)") + return nil, fmt.Errorf("serviceCIDR and secondaryServiceCIDR are not dualstack (from different IPfamiles)") } } @@ -153,14 +150,19 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo // --node-cidr-mask-size flag is incompatible with dual stack clusters. nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(controllerContext.ComponentConfig.NodeIPAMController, clusterCIDRs) if err != nil { - return nil, false, err + return nil, err + } + + client, err := controllerContext.NewClient("node-controller") + if err != nil { + return nil, err } nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( ctx, controllerContext.InformerFactory.Core().V1().Nodes(), nil, // no cloud provider on kube-controller-manager since v1.31 (KEP-2395) - controllerContext.ClientBuilder.ClientOrDie("node-controller"), + client, clusterCIDRs, serviceCIDR, secondaryServiceCIDR, @@ -168,29 +170,36 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType), ) if err != nil { - return nil, true, err + return nil, err } - go nodeIpamController.RunWithMetrics(ctx, controllerContext.ControllerManagerMetrics) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + nodeIpamController.RunWithMetrics(ctx, controllerContext.ControllerManagerMetrics) + }, controllerName), nil } func newNodeLifecycleControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.NodeLifecycleController, - aliases: []string{"nodelifecycle"}, - initFunc: startNodeLifecycleController, + name: names.NodeLifecycleController, + aliases: []string{"nodelifecycle"}, + constructor: newNodeLifecycleController, } } -func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController( +func newNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("node-controller") + if err != nil { + return nil, err + } + + nlc, err := lifecyclecontroller.NewNodeLifecycleController( ctx, controllerContext.InformerFactory.Coordination().V1().Leases(), controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Apps().V1().DaemonSets(), // node lifecycle controller uses existing cluster role from node-controller - controllerContext.ClientBuilder.ClientOrDie("node-controller"), + client, controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, controllerContext.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration, controllerContext.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration, @@ -200,42 +209,49 @@ func startNodeLifecycleController(ctx context.Context, controllerContext Control controllerContext.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold, ) if err != nil { - return nil, true, err + return nil, err } - go lifecycleController.Run(ctx) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + nlc.Run(ctx) + }, controllerName), nil } func newTaintEvictionControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.TaintEvictionController, - initFunc: startTaintEvictionController, + name: names.TaintEvictionController, + constructor: newTaintEvictionController, requiredFeatureGates: []featuregate.Feature{ features.SeparateTaintEvictionController, }, } } -func startTaintEvictionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - taintEvictionController, err := tainteviction.New( +func newTaintEvictionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + // taint-manager uses existing cluster role from node-controller + client, err := controllerContext.NewClient("node-controller") + if err != nil { + return nil, err + } + + tec, err := tainteviction.New( ctx, - // taint-manager uses existing cluster role from node-controller - controllerContext.ClientBuilder.ClientOrDie("node-controller"), + client, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Nodes(), controllerName, ) if err != nil { - return nil, false, err + return nil, err } - go taintEvictionController.Run(ctx) - return nil, true, nil + + return newControllerLoop(tec.Run, controllerName), nil } func newDeviceTaintEvictionControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.DeviceTaintEvictionController, - initFunc: startDeviceTaintEvictionController, + name: names.DeviceTaintEvictionController, + constructor: newDeviceTaintEvictionController, requiredFeatureGates: []featuregate.Feature{ // TODO update app.TestFeatureGatedControllersShouldNotDefineAliases when removing these feature gates. features.DynamicResourceAllocation, @@ -244,9 +260,14 @@ func newDeviceTaintEvictionControllerDescriptor() *ControllerDescriptor { } } -func startDeviceTaintEvictionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newDeviceTaintEvictionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient(names.DeviceTaintEvictionController) + if err != nil { + return nil, err + } + deviceTaintEvictionController := devicetainteviction.New( - controllerContext.ClientBuilder.ClientOrDie(names.DeviceTaintEvictionController), + client, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Resource().V1().ResourceClaims(), controllerContext.InformerFactory.Resource().V1().ResourceSlices(), @@ -254,61 +275,62 @@ func startDeviceTaintEvictionController(ctx context.Context, controllerContext C controllerContext.InformerFactory.Resource().V1().DeviceClasses(), controllerName, ) - go func() { + return newControllerLoop(func(ctx context.Context) { if err := deviceTaintEvictionController.Run(ctx); err != nil { klog.FromContext(ctx).Error(err, "Device taint processing leading to Pod eviction failed and is now paused") } - }() - return nil, true, nil + <-ctx.Done() + }, controllerName), nil } func newCloudNodeLifecycleControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: cpnames.CloudNodeLifecycleController, - aliases: []string{"cloud-node-lifecycle"}, - initFunc: startCloudNodeLifecycleController, + name: cpnames.CloudNodeLifecycleController, + aliases: []string{"cloud-node-lifecycle"}, + constructor: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + logger := klog.FromContext(ctx) + logger.Info("Warning: node-controller is set, but no cloud provider functionality is available in kube-controller-manger (KEP-2395). Will not configure node lifecyle controller.") + return nil, nil + }, isCloudProviderController: true, } } -func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - logger := klog.FromContext(ctx) - logger.Info("Warning: node-controller is set, but no cloud provider functionality is available in kube-controller-manger (KEP-2395). Will not configure node lifecyle controller.") - return nil, false, nil -} - func newNodeRouteControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: cpnames.NodeRouteController, - aliases: []string{"route"}, - initFunc: startNodeRouteController, + name: cpnames.NodeRouteController, + aliases: []string{"route"}, + constructor: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + logger := klog.FromContext(ctx) + logger.Info("Warning: configure-cloud-routes is set, but no cloud provider functionality is available in kube-controller-manger (KEP-2395). Will not configure cloud provider routes.") + return nil, nil + }, isCloudProviderController: true, } } -func startNodeRouteController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - logger := klog.FromContext(ctx) - logger.Info("Warning: configure-cloud-routes is set, but no cloud provider functionality is available in kube-controller-manger (KEP-2395). Will not configure cloud provider routes.") - return nil, false, nil -} - func newPersistentVolumeBinderControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PersistentVolumeBinderController, - aliases: []string{"persistentvolume-binder"}, - initFunc: startPersistentVolumeBinderController, + name: names.PersistentVolumeBinderController, + aliases: []string{"persistentvolume-binder"}, + constructor: newPersistentVolumeBinderController, } } -func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { logger := klog.FromContext(ctx) plugins, err := ProbeProvisionableRecyclableVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) if err != nil { - return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err) + return nil, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %w", err) + } + + client, err := controllerContext.NewClient("persistent-volume-binder") + if err != nil { + return nil, err } params := persistentvolumecontroller.ControllerParameters{ - KubeClient: controllerContext.ClientBuilder.ClientOrDie("persistent-volume-binder"), + KubeClient: client, SyncPeriod: controllerContext.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration, VolumePlugins: plugins, VolumeInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumes(), @@ -318,211 +340,261 @@ func startPersistentVolumeBinderController(ctx context.Context, controllerContex NodeInformer: controllerContext.InformerFactory.Core().V1().Nodes(), EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning, } - volumeController, volumeControllerErr := persistentvolumecontroller.NewController(ctx, params) - if volumeControllerErr != nil { - return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr) + volumeController, err := persistentvolumecontroller.NewController(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to construct persistentvolume controller: %w", err) } - go volumeController.Run(ctx) - return nil, true, nil + + return newControllerLoop(volumeController.Run, controllerName), nil } func newPersistentVolumeAttachDetachControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PersistentVolumeAttachDetachController, - aliases: []string{"attachdetach"}, - initFunc: startPersistentVolumeAttachDetachController, + name: names.PersistentVolumeAttachDetachController, + aliases: []string{"attachdetach"}, + constructor: newPersistentVolumeAttachDetachController, } } -func startPersistentVolumeAttachDetachController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newPersistentVolumeAttachDetachController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { logger := klog.FromContext(ctx) csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes() csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers() plugins, err := ProbeAttachableVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) if err != nil { - return nil, true, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %v", err) + return nil, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %w", err) + } + + client, err := controllerContext.NewClient("attachdetach-controller") + if err != nil { + return nil, err } ctx = klog.NewContext(ctx, logger) - attachDetachController, attachDetachControllerErr := - attachdetach.NewAttachDetachController( - ctx, - controllerContext.ClientBuilder.ClientOrDie("attachdetach-controller"), - controllerContext.InformerFactory.Core().V1().Pods(), - controllerContext.InformerFactory.Core().V1().Nodes(), - controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), - controllerContext.InformerFactory.Core().V1().PersistentVolumes(), - csiNodeInformer, - csiDriverInformer, - controllerContext.InformerFactory.Storage().V1().VolumeAttachments(), - plugins, - GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), - controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, - controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, - controllerContext.ComponentConfig.AttachDetachController.DisableForceDetachOnTimeout, - attachdetach.DefaultTimerConfig, - ) - if attachDetachControllerErr != nil { - return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) + attachDetachController, err := attachdetach.NewAttachDetachController( + ctx, + client, + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), + csiNodeInformer, + csiDriverInformer, + controllerContext.InformerFactory.Storage().V1().VolumeAttachments(), + plugins, + GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), + controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, + controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, + controllerContext.ComponentConfig.AttachDetachController.DisableForceDetachOnTimeout, + attachdetach.DefaultTimerConfig, + ) + if err != nil { + return nil, fmt.Errorf("failed to start attach/detach controller: %w", err) } - go attachDetachController.Run(ctx) - return nil, true, nil + + return newControllerLoop(attachDetachController.Run, controllerName), nil } func newPersistentVolumeExpanderControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PersistentVolumeExpanderController, - aliases: []string{"persistentvolume-expander"}, - initFunc: startPersistentVolumeExpanderController, + name: names.PersistentVolumeExpanderController, + aliases: []string{"persistentvolume-expander"}, + constructor: newPersistentVolumeExpanderController, } } -func startPersistentVolumeExpanderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newPersistentVolumeExpanderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { logger := klog.FromContext(ctx) plugins, err := ProbeExpandableVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) if err != nil { - return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err) + return nil, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %w", err) } csiTranslator := csitrans.New() - expandController, expandControllerErr := expand.NewExpandController( + client, err := controllerContext.NewClient("expand-controller") + if err != nil { + return nil, err + } + + expandController, err := expand.NewExpandController( ctx, - controllerContext.ClientBuilder.ClientOrDie("expand-controller"), + client, controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), plugins, csiTranslator, csimigration.NewPluginManager(csiTranslator, utilfeature.DefaultFeatureGate), ) - - if expandControllerErr != nil { - return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr) + if err != nil { + return nil, fmt.Errorf("failed to init volume expand controller: %w", err) } - go expandController.Run(ctx) - return nil, true, nil + + return newControllerLoop(expandController.Run, controllerName), nil } func newEphemeralVolumeControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.EphemeralVolumeController, - aliases: []string{"ephemeral-volume"}, - initFunc: startEphemeralVolumeController, + name: names.EphemeralVolumeController, + aliases: []string{"ephemeral-volume"}, + constructor: newEphemeralVolumeController, } } -func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("ephemeral-volume-controller") + if err != nil { + return nil, err + } + ephemeralController, err := ephemeral.NewController( ctx, - controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"), + client, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims()) if err != nil { - return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) + return nil, fmt.Errorf("failed to init ephemeral volume controller: %w", err) } - go ephemeralController.Run(ctx, int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs)) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + ephemeralController.Run(ctx, int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs)) + }, controllerName), nil } const defaultResourceClaimControllerWorkers = 50 func newResourceClaimControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ResourceClaimController, - aliases: []string{"resource-claim-controller"}, - initFunc: startResourceClaimController, + name: names.ResourceClaimController, + aliases: []string{"resource-claim-controller"}, + constructor: newResourceClaimController, requiredFeatureGates: []featuregate.Feature{ features.DynamicResourceAllocation, // TODO update app.TestFeatureGatedControllersShouldNotDefineAliases when removing this feature }, } } -func startResourceClaimController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newResourceClaimController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("resource-claim-controller") + if err != nil { + return nil, err + } + ephemeralController, err := resourceclaim.NewController( klog.FromContext(ctx), resourceclaim.Features{ AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), }, - controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"), + client, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Resource().V1().ResourceClaims(), controllerContext.InformerFactory.Resource().V1().ResourceClaimTemplates()) if err != nil { - return nil, true, fmt.Errorf("failed to start resource claim controller: %v", err) + return nil, fmt.Errorf("failed to init resource claim controller: %w", err) } - go ephemeralController.Run(ctx, defaultResourceClaimControllerWorkers) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + ephemeralController.Run(ctx, defaultResourceClaimControllerWorkers) + }, controllerName), nil } func newEndpointsControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.EndpointsController, - aliases: []string{"endpoint"}, - initFunc: startEndpointsController, + name: names.EndpointsController, + aliases: []string{"endpoint"}, + constructor: newEndpointsController, } } -func startEndpointsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go endpointcontroller.NewEndpointController( +func newEndpointsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("endpoint-controller") + if err != nil { + return nil, err + } + + ec := endpointcontroller.NewEndpointController( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Services(), controllerContext.InformerFactory.Core().V1().Endpoints(), - controllerContext.ClientBuilder.ClientOrDie("endpoint-controller"), + client, controllerContext.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, - ).Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs)) - return nil, true, nil + ) + return newControllerLoop(func(ctx context.Context) { + ec.Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs)) + }, controllerName), nil } func newReplicationControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ReplicationControllerController, - aliases: []string{"replicationcontroller"}, - initFunc: startReplicationController, + name: names.ReplicationControllerController, + aliases: []string{"replicationcontroller"}, + constructor: newReplicationController, } } -func startReplicationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go replicationcontroller.NewReplicationManager( +func newReplicationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("replication-controller") + if err != nil { + return nil, err + } + + rc := replicationcontroller.NewReplicationManager( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().ReplicationControllers(), - controllerContext.ClientBuilder.ClientOrDie("replication-controller"), + client, replicationcontroller.BurstReplicas, - ).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) - return nil, true, nil + ) + + return newControllerLoop(func(ctx context.Context) { + rc.Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) + }, controllerName), nil } func newPodGarbageCollectorControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PodGarbageCollectorController, - aliases: []string{"podgc"}, - initFunc: startPodGarbageCollectorController, + name: names.PodGarbageCollectorController, + aliases: []string{"podgc"}, + constructor: newPodGarbageCollectorController, } } -func startPodGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go podgc.NewPodGC( +func newPodGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("pod-garbage-collector") + if err != nil { + return nil, err + } + + pgcc := podgc.NewPodGC( ctx, - controllerContext.ClientBuilder.ClientOrDie("pod-garbage-collector"), + client, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Nodes(), int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold), - ).Run(ctx) - return nil, true, nil + ) + return newControllerLoop(pgcc.Run, controllerName), nil } func newResourceQuotaControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ResourceQuotaController, - aliases: []string{"resourcequota"}, - initFunc: startResourceQuotaController, + name: names.ResourceQuotaController, + aliases: []string{"resourcequota"}, + constructor: newResourceQuotaController, } } -func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller") - resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller") +func newResourceQuotaController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + resourceQuotaControllerClient, err := controllerContext.NewClient("resourcequota-controller") + if err != nil { + return nil, err + } + + resourceQuotaControllerDiscoveryClient, err := controllerContext.ClientBuilder.DiscoveryClient("resourcequota-controller") + if err != nil { + return nil, fmt.Errorf("failed to create the discovery client: %w", err) + } + discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources listerFuncForResource := generic.ListerFuncForResourceFunc(controllerContext.InformerFactory.ForResource) quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource) @@ -541,40 +613,54 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control } resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions) if err != nil { - return nil, false, err + return nil, err } - go resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs)) - // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second) - - return nil, true, nil + return newControllerLoop(concurrentRun( + func(ctx context.Context) { + resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs)) + }, + func(ctx context.Context) { + resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second) + }, + ), controllerName), nil } func newNamespaceControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.NamespaceController, - aliases: []string{"namespace"}, - initFunc: startNamespaceController, + name: names.NamespaceController, + aliases: []string{"namespace"}, + constructor: newNamespaceController, } } -func startNamespaceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newNamespaceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { // the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls // the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource // including events), takes ~10 seconds by default. - nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller") + nsKubeconfig, err := controllerContext.NewClientConfig("namespace-controller") + if err != nil { + return nil, err + } + nsKubeconfig.QPS *= 20 nsKubeconfig.Burst *= 100 - namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) - return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig) + + namespaceKubeClient, err := clientset.NewForConfig(nsKubeconfig) + if err != nil { + return nil, err + } + + return newModifiedNamespaceController(ctx, controllerContext, controllerName, namespaceKubeClient, nsKubeconfig) } -func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) { - +func newModifiedNamespaceController( + ctx context.Context, controllerContext ControllerContext, controllerName string, + namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config, +) (Controller, error) { metadataClient, err := metadata.NewForConfig(nsKubeconfig) if err != nil { - return nil, true, err + return nil, err } discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources @@ -588,204 +674,289 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes, ) - go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs)) - - return nil, true, nil + return newControllerLoop(func(ctx context.Context) { + namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs)) + }, controllerName), nil } func newServiceAccountControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ServiceAccountController, - aliases: []string{"serviceaccount"}, - initFunc: startServiceAccountController, + name: names.ServiceAccountController, + aliases: []string{"serviceaccount"}, + constructor: newServiceAccountController, } } -func startServiceAccountController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newServiceAccountController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("service-account-controller") + if err != nil { + return nil, err + } + sac, err := serviceaccountcontroller.NewServiceAccountsController( controllerContext.InformerFactory.Core().V1().ServiceAccounts(), controllerContext.InformerFactory.Core().V1().Namespaces(), - controllerContext.ClientBuilder.ClientOrDie("service-account-controller"), + client, serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ) if err != nil { - return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err) + return nil, fmt.Errorf("error creating ServiceAccount controller: %w", err) } - go sac.Run(ctx, 1) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + sac.Run(ctx, 1) + }, controllerName), nil } func newTTLControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.TTLController, - aliases: []string{"ttl"}, - initFunc: startTTLController, + name: names.TTLController, + aliases: []string{"ttl"}, + constructor: newTTLController, } } -func startTTLController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go ttlcontroller.NewTTLController( +func newTTLController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("ttl-controller") + if err != nil { + return nil, err + } + + ttlc := ttlcontroller.NewTTLController( ctx, controllerContext.InformerFactory.Core().V1().Nodes(), - controllerContext.ClientBuilder.ClientOrDie("ttl-controller"), - ).Run(ctx, 5) - return nil, true, nil + client, + ) + return newControllerLoop(func(ctx context.Context) { + ttlc.Run(ctx, 5) + }, controllerName), nil } func newGarbageCollectorControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.GarbageCollectorController, - aliases: []string{"garbagecollector"}, - initFunc: startGarbageCollectorController, + name: names.GarbageCollectorController, + aliases: []string{"garbagecollector"}, + constructor: newGarbageCollectorController, } } -func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +type garbageCollectorController struct { + *garbagecollector.GarbageCollector + controllerContext ControllerContext + controllerName string + discoveryClient discovery.DiscoveryInterface +} + +// Make sure we are propagating properly. +var _ controller.Debuggable = (*garbageCollectorController)(nil) + +func newGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { - return nil, false, nil + return nil, nil } - gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector") - discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector") + client, err := controllerContext.NewClient("generic-garbage-collector") + if err != nil { + return nil, err + } + + discoveryClient, err := controllerContext.ClientBuilder.DiscoveryClient("generic-garbage-collector") + if err != nil { + return nil, fmt.Errorf("failed to create the discovery client: %w", err) + } + + config, err := controllerContext.NewClientConfig("generic-garbage-collector") + if err != nil { + return nil, err + } - config := controllerContext.ClientBuilder.ConfigOrDie("generic-garbage-collector") // Increase garbage collector controller's throughput: each object deletion takes two API calls, // so to get |config.QPS| deletion rate we need to allow 2x more requests for this controller. config.QPS *= 2 metadataClient, err := metadata.NewForConfig(config) if err != nil { - return nil, true, err + return nil, err } garbageCollector, err := garbagecollector.NewComposedGarbageCollector( ctx, - gcClientset, + client, metadataClient, controllerContext.RESTMapper, controllerContext.GraphBuilder, ) if err != nil { - return nil, true, fmt.Errorf("failed to start the generic garbage collector: %w", err) + return nil, fmt.Errorf("failed to init the generic garbage collector: %w", err) } - // Start the garbage collector. - workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) + return &garbageCollectorController{ + GarbageCollector: garbageCollector, + controllerName: controllerName, + controllerContext: controllerContext, + discoveryClient: discoveryClient, + }, nil +} + +// Name must be implemented explicitly as it collides with the embedded controller. +func (c *garbageCollectorController) Name() string { + return c.controllerName +} + +func (c *garbageCollectorController) Run(ctx context.Context) { + workers := int(c.controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) const syncPeriod = 30 * time.Second - go garbageCollector.Run(ctx, workers, syncPeriod) - // Periodically refresh the RESTMapper with new discovery information and sync - // the garbage collector. - go garbageCollector.Sync(ctx, discoveryClient, syncPeriod) - - return garbageCollector, true, nil + concurrentRun( + func(ctx context.Context) { + c.GarbageCollector.Run(ctx, workers, syncPeriod) + }, + func(ctx context.Context) { + // Periodically refresh the RESTMapper with new discovery information and sync the garbage collector. + c.Sync(ctx, c.discoveryClient, syncPeriod) + }, + )(ctx) } func newPersistentVolumeClaimProtectionControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PersistentVolumeClaimProtectionController, - aliases: []string{"pvc-protection"}, - initFunc: startPersistentVolumeClaimProtectionController, + name: names.PersistentVolumeClaimProtectionController, + aliases: []string{"pvc-protection"}, + constructor: newPersistentVolumeClaimProtectionController, } } -func startPersistentVolumeClaimProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newPersistentVolumeClaimProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("pvc-protection-controller") + if err != nil { + return nil, err + } + pvcProtectionController, err := pvcprotection.NewPVCProtectionController( klog.FromContext(ctx), controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Core().V1().Pods(), - controllerContext.ClientBuilder.ClientOrDie("pvc-protection-controller"), + client, ) if err != nil { - return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err) + return nil, fmt.Errorf("failed to init the pvc protection controller: %w", err) } - go pvcProtectionController.Run(ctx, 1) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + pvcProtectionController.Run(ctx, 1) + }, controllerName), nil } func newPersistentVolumeProtectionControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.PersistentVolumeProtectionController, - aliases: []string{"pv-protection"}, - initFunc: startPersistentVolumeProtectionController, + name: names.PersistentVolumeProtectionController, + aliases: []string{"pv-protection"}, + constructor: newPersistentVolumeProtectionController, } } -func startPersistentVolumeProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go pvprotection.NewPVProtectionController( +func newPersistentVolumeProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("pv-protection-controller") + if err != nil { + return nil, err + } + + pvpc := pvprotection.NewPVProtectionController( klog.FromContext(ctx), controllerContext.InformerFactory.Core().V1().PersistentVolumes(), - controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"), - ).Run(ctx, 1) - return nil, true, nil + client, + ) + return newControllerLoop(func(ctx context.Context) { + pvpc.Run(ctx, 1) + }, controllerName), nil } func newVolumeAttributesClassProtectionControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.VolumeAttributesClassProtectionController, - initFunc: startVolumeAttributesClassProtectionController, + name: names.VolumeAttributesClassProtectionController, + constructor: newVolumeAttributesClassProtectionController, requiredFeatureGates: []featuregate.Feature{ features.VolumeAttributesClass, }, } } -func startVolumeAttributesClassProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newVolumeAttributesClassProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("volumeattributesclass-protection-controller") + if err != nil { + return nil, err + } + vacProtectionController, err := vacprotection.NewVACProtectionController( klog.FromContext(ctx), - controllerContext.ClientBuilder.ClientOrDie("volumeattributesclass-protection-controller"), + client, controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Core().V1().PersistentVolumes(), controllerContext.InformerFactory.Storage().V1().VolumeAttributesClasses(), ) if err != nil { - return nil, true, fmt.Errorf("failed to start the vac protection controller: %w", err) + return nil, fmt.Errorf("failed to init the vac protection controller: %w", err) } - go vacProtectionController.Run(ctx, 1) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + vacProtectionController.Run(ctx, 1) + }, controllerName), nil } func newTTLAfterFinishedControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.TTLAfterFinishedController, - aliases: []string{"ttl-after-finished"}, - initFunc: startTTLAfterFinishedController, + name: names.TTLAfterFinishedController, + aliases: []string{"ttl-after-finished"}, + constructor: newTTLAfterFinishedController, } } -func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go ttlafterfinished.New( +func newTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("ttl-after-finished-controller") + if err != nil { + return nil, err + } + + ttlc := ttlafterfinished.New( ctx, controllerContext.InformerFactory.Batch().V1().Jobs(), - controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), - ).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs)) - return nil, true, nil + client, + ) + return newControllerLoop(func(ctx context.Context) { + ttlc.Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs)) + }, controllerName), nil } func newLegacyServiceAccountTokenCleanerControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.LegacyServiceAccountTokenCleanerController, - aliases: []string{"legacy-service-account-token-cleaner"}, - initFunc: startLegacyServiceAccountTokenCleanerController, + name: names.LegacyServiceAccountTokenCleanerController, + aliases: []string{"legacy-service-account-token-cleaner"}, + constructor: newLegacyServiceAccountTokenCleanerController, } } -func startLegacyServiceAccountTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { +func newLegacyServiceAccountTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("legacy-service-account-token-cleaner") + if err != nil { + return nil, err + } + cleanUpPeriod := controllerContext.ComponentConfig.LegacySATokenCleaner.CleanUpPeriod.Duration legacySATokenCleaner, err := serviceaccountcontroller.NewLegacySATokenCleaner( controllerContext.InformerFactory.Core().V1().ServiceAccounts(), controllerContext.InformerFactory.Core().V1().Secrets(), controllerContext.InformerFactory.Core().V1().Pods(), - controllerContext.ClientBuilder.ClientOrDie("legacy-service-account-token-cleaner"), + client, clock.RealClock{}, serviceaccountcontroller.LegacySATokenCleanerOptions{ CleanUpPeriod: cleanUpPeriod, SyncInterval: serviceaccountcontroller.DefaultCleanerSyncInterval, - }) + }, + ) if err != nil { - return nil, true, fmt.Errorf("failed to start the legacy service account token cleaner: %v", err) + return nil, fmt.Errorf("failed to init the legacy service account token cleaner: %w", err) } - go legacySATokenCleaner.Run(ctx) - return nil, true, nil + + return newControllerLoop(legacySATokenCleaner.Run, controllerName), nil } // processCIDRs is a helper function that works on a comma separated cidrs and returns @@ -906,9 +1077,9 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl func newStorageVersionGarbageCollectorControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.StorageVersionGarbageCollectorController, - aliases: []string{"storage-version-gc"}, - initFunc: startStorageVersionGarbageCollectorController, + name: names.StorageVersionGarbageCollectorController, + aliases: []string{"storage-version-gc"}, + constructor: newStorageVersionGarbageCollectorController, requiredFeatureGates: []featuregate.Feature{ genericfeatures.APIServerIdentity, genericfeatures.StorageVersionAPI, @@ -916,20 +1087,25 @@ func newStorageVersionGarbageCollectorControllerDescriptor() *ControllerDescript } } -func startStorageVersionGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go storageversiongc.NewStorageVersionGC( +func newStorageVersionGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("storage-version-garbage-collector") + if err != nil { + return nil, err + } + + svgcc := storageversiongc.NewStorageVersionGC( ctx, - controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), + client, controllerContext.InformerFactory.Coordination().V1().Leases(), controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(), - ).Run(ctx) - return nil, true, nil + ) + return newControllerLoop(svgcc.Run, controllerName), nil } func newSELinuxWarningControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ name: names.SELinuxWarningController, - initFunc: startSELinuxWarningController, + constructor: newSELinuxWarningController, isDisabledByDefault: true, requiredFeatureGates: []featuregate.Feature{ features.SELinuxChangePolicy, @@ -937,32 +1113,34 @@ func newSELinuxWarningControllerDescriptor() *ControllerDescriptor { } } -func startSELinuxWarningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - if !utilfeature.DefaultFeatureGate.Enabled(features.SELinuxChangePolicy) { - return nil, false, nil +func newSELinuxWarningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient(controllerName) + if err != nil { + return nil, err } logger := klog.FromContext(ctx) csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers() plugins, err := ProbePersistentVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) if err != nil { - return nil, true, fmt.Errorf("failed to probe volume plugins when starting SELinux warning controller: %w", err) + return nil, fmt.Errorf("failed to probe volume plugins when starting SELinux warning controller: %w", err) } - seLinuxController, err := - selinuxwarning.NewController( - ctx, - controllerContext.ClientBuilder.ClientOrDie(controllerName), - controllerContext.InformerFactory.Core().V1().Pods(), - controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), - controllerContext.InformerFactory.Core().V1().PersistentVolumes(), - csiDriverInformer, - plugins, - GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), - ) + seLinuxController, err := selinuxwarning.NewController( + ctx, + client, + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), + csiDriverInformer, + plugins, + GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), + ) if err != nil { - return nil, true, fmt.Errorf("failed to start SELinux warning controller: %w", err) + return nil, fmt.Errorf("failed to start SELinux warning controller: %w", err) } - go seLinuxController.Run(ctx, 1) - return nil, true, nil + + return newControllerLoop(func(ctx context.Context) { + seLinuxController.Run(ctx, 1) + }, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/core_test.go b/cmd/kube-controller-manager/app/core_test.go index d66a768df89..8808ce6bf81 100644 --- a/cmd/kube-controller-manager/app/core_test.go +++ b/cmd/kube-controller-manager/app/core_test.go @@ -18,6 +18,7 @@ package app import ( "context" + "sync" "testing" "time" @@ -28,6 +29,8 @@ import ( clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" + + "k8s.io/kubernetes/cmd/kube-controller-manager/names" ) // TestClientBuilder inherits ClientBuilder and can accept a given fake clientset. @@ -35,12 +38,14 @@ type TestClientBuilder struct { clientset clientset.Interface } -func (TestClientBuilder) Config(name string) (*restclient.Config, error) { return nil, nil } +func (TestClientBuilder) Config(name string) (*restclient.Config, error) { + return &restclient.Config{}, nil +} func (TestClientBuilder) ConfigOrDie(name string) *restclient.Config { return &restclient.Config{} } -func (TestClientBuilder) Client(name string) (clientset.Interface, error) { return nil, nil } +func (m TestClientBuilder) Client(name string) (clientset.Interface, error) { return m.clientset, nil } func (m TestClientBuilder) ClientOrDie(name string) clientset.Interface { return m.clientset } @@ -130,26 +135,44 @@ func TestController_DiscoveryError(t *testing.T) { }, } for name, test := range tcs { - testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources} - testClientset := NewFakeClientset(testDiscovery) - testClientBuilder := TestClientBuilder{clientset: testClientset} - testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)) - ctx := ControllerContext{ - ClientBuilder: testClientBuilder, - InformerFactory: testInformerFactory, - ObjectOrMetadataInformerFactory: testInformerFactory, - InformersStarted: make(chan struct{}), - } - for controllerName, controllerDesc := range controllerDescriptorMap { - _, _, err := controllerDesc.GetInitFunc()(context.TODO(), ctx, controllerName) - if test.expectedErr != (err != nil) { - t.Errorf("%v test failed for use case: %v", controllerName, name) + t.Run(name, func(t *testing.T) { + ctx := context.Background() + testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources} + testClientset := NewFakeClientset(testDiscovery) + testClientBuilder := TestClientBuilder{clientset: testClientset} + testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)) + controllerContext := ControllerContext{ + ClientBuilder: testClientBuilder, + InformerFactory: testInformerFactory, + ObjectOrMetadataInformerFactory: testInformerFactory, + InformersStarted: make(chan struct{}), } - } - _, _, err := startModifiedNamespaceController( - context.TODO(), ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) - if test.expectedErr != (err != nil) { - t.Errorf("Namespace Controller test failed for use case: %v", name) - } + for controllerName, controllerDesc := range controllerDescriptorMap { + _, err := controllerDesc.GetControllerConstructor()(ctx, controllerContext, controllerName) + if test.expectedErr != (err != nil) { + t.Errorf("%v test failed for use case: %v", controllerName, name) + } + } + + namespaceController, err := newModifiedNamespaceController( + ctx, controllerContext, names.NamespaceController, + testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + namespaceController.Run(ctx) + }() + cancel() + wg.Wait() + if test.expectedErr != (err != nil) { + t.Errorf("Namespace Controller test failed for use case: %v", name) + } + }) } } diff --git a/cmd/kube-controller-manager/app/discovery.go b/cmd/kube-controller-manager/app/discovery.go index e79fc9b2b93..f749cc58e6c 100644 --- a/cmd/kube-controller-manager/app/discovery.go +++ b/cmd/kube-controller-manager/app/discovery.go @@ -22,7 +22,6 @@ package app import ( "context" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice" endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring" @@ -30,43 +29,57 @@ import ( func newEndpointSliceControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.EndpointSliceController, - aliases: []string{"endpointslice"}, - initFunc: startEndpointSliceController, + name: names.EndpointSliceController, + aliases: []string{"endpointslice"}, + constructor: newEndpointSliceController, } } -func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go endpointslicecontroller.NewController( +func newEndpointSliceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("endpointslice-controller") + if err != nil { + return nil, err + } + + esc := endpointslicecontroller.NewController( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Services(), controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Discovery().V1().EndpointSlices(), controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice, - controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"), + client, controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration, - ).Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs)) - return nil, true, nil + ) + return newControllerLoop(func(ctx context.Context) { + esc.Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs)) + }, controllerName), nil } func newEndpointSliceMirroringControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.EndpointSliceMirroringController, - aliases: []string{"endpointslicemirroring"}, - initFunc: startEndpointSliceMirroringController, + name: names.EndpointSliceMirroringController, + aliases: []string{"endpointslicemirroring"}, + constructor: newEndpointSliceMirroringController, } } -func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go endpointslicemirroringcontroller.NewController( +func newEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("endpointslicemirroring-controller") + if err != nil { + return nil, err + } + + esmc := endpointslicemirroringcontroller.NewController( ctx, controllerContext.InformerFactory.Core().V1().Endpoints(), controllerContext.InformerFactory.Discovery().V1().EndpointSlices(), controllerContext.InformerFactory.Core().V1().Services(), controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset, - controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"), + client, controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration, - ).Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs)) - return nil, true, nil + ) + return newControllerLoop(func(ctx context.Context) { + esmc.Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs)) + }, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/networking.go b/cmd/kube-controller-manager/app/networking.go index aa1c3c9d111..ac4551d9dcd 100644 --- a/cmd/kube-controller-manager/app/networking.go +++ b/cmd/kube-controller-manager/app/networking.go @@ -23,7 +23,6 @@ import ( "context" "k8s.io/component-base/featuregate" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/servicecidrs" "k8s.io/kubernetes/pkg/features" @@ -31,20 +30,28 @@ import ( func newServiceCIDRsControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ServiceCIDRController, - initFunc: startServiceCIDRsController, + name: names.ServiceCIDRController, + constructor: newServiceCIDRsController, requiredFeatureGates: []featuregate.Feature{ features.MultiCIDRServiceAllocator, - }} + }, + } } -func startServiceCIDRsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go servicecidrs.NewController( + +func newServiceCIDRsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("service-cidrs-controller") + if err != nil { + return nil, err + } + + // TODO use component config + scc := servicecidrs.NewController( ctx, controllerContext.InformerFactory.Networking().V1().ServiceCIDRs(), controllerContext.InformerFactory.Networking().V1().IPAddresses(), - controllerContext.ClientBuilder.ClientOrDie("service-cidrs-controller"), - ).Run(ctx, 5) - // TODO use component config - return nil, true, nil - + client, + ) + return newControllerLoop(func(ctx context.Context) { + scc.Run(ctx, 5) + }, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 176b2a14bc1..3d794cb8644 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "net" + "time" v1 "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -108,6 +109,8 @@ type KubeControllerManagerOptions struct { Master string ShowHiddenMetricsForVersion string + ControllerShutdownTimeout time.Duration + // ComponentGlobalsRegistry is the registry where the effective versions and feature gates for all components are stored. ComponentGlobalsRegistry basecompatibility.ComponentGlobalsRegistry @@ -238,6 +241,8 @@ func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) { s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources s.Generic.LeaderElection.ResourceName = "kube-controller-manager" s.Generic.LeaderElection.ResourceNamespace = "kube-system" + + s.ControllerShutdownTimeout = 10 * time.Second return &s, nil } @@ -298,6 +303,9 @@ func (s *KubeControllerManagerOptions) Flags(allControllers []string, disabledBy fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).") fs.StringVar(&s.Generic.ClientConnection.Kubeconfig, "kubeconfig", s.Generic.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization and master location information (the master location can be overridden by the master flag).") + fss.FlagSet("generic").DurationVar(&s.ControllerShutdownTimeout, "controller-shutdown-timeout", + s.ControllerShutdownTimeout, "Time to wait for the controllers to shut down before terminating the executable") + if !utilfeature.DefaultFeatureGate.Enabled(featuregate.Feature(clientgofeaturegate.WatchListClient)) { ver := version.MustParse("1.34") if err := utilfeature.DefaultMutableFeatureGate.OverrideDefaultAtVersion(featuregate.Feature(clientgofeaturegate.WatchListClient), true, ver); err != nil { @@ -413,6 +421,7 @@ func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config, a return err } } + c.ControllerShutdownTimeout = s.ControllerShutdownTimeout return nil } @@ -502,11 +511,12 @@ func (s KubeControllerManagerOptions) Config(ctx context.Context, allControllers eventRecorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: KubeControllerManagerUserAgent}) c := &kubecontrollerconfig.Config{ - Client: client, - Kubeconfig: kubeconfig, - EventBroadcaster: eventBroadcaster, - EventRecorder: eventRecorder, - ComponentGlobalsRegistry: s.ComponentGlobalsRegistry, + Client: client, + Kubeconfig: kubeconfig, + EventBroadcaster: eventBroadcaster, + EventRecorder: eventRecorder, + ControllerShutdownTimeout: s.ControllerShutdownTimeout, + ComponentGlobalsRegistry: s.ComponentGlobalsRegistry, } if err := s.ApplyTo(c, allControllers, disabledByDefaultControllers, controllerAliases); err != nil { return nil, err diff --git a/cmd/kube-controller-manager/app/options/options_test.go b/cmd/kube-controller-manager/app/options/options_test.go index 8d19d2292ef..ca00e77d05a 100644 --- a/cmd/kube-controller-manager/app/options/options_test.go +++ b/cmd/kube-controller-manager/app/options/options_test.go @@ -447,9 +447,10 @@ func TestAddFlags(t *testing.T) { AlwaysAllowPaths: []string{"/healthz", "/readyz", "/livez"}, // note: this does not match /healthz/ or /healthz/* AlwaysAllowGroups: []string{"system:masters"}, }, - Master: "192.168.4.20", - Metrics: &metrics.Options{}, - Logs: logs.NewOptions(), + Master: "192.168.4.20", + ControllerShutdownTimeout: 10 * time.Second, + Metrics: &metrics.Options{}, + Logs: logs.NewOptions(), // ignores comparing ComponentGlobalsRegistry in this test. ComponentGlobalsRegistry: s.ComponentGlobalsRegistry, } @@ -722,6 +723,7 @@ func TestApplyTo(t *testing.T) { ConcurrentPolicySyncs: 9, }, }, + ControllerShutdownTimeout: 10 * time.Second, } // Sort GCIgnoredResources because it's built from a map, which means the diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 2c83e7e6b2a..3ed1e35120b 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -106,7 +106,7 @@ func probeControllerVolumePlugins(logger klog.Logger, config persistentvolumecon } if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, &hostPathConfig); err != nil { logger.Error(err, "Could not create hostpath recycler pod from file", "path", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) + return nil, err } allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(hostPathConfig)...) @@ -117,7 +117,7 @@ func probeControllerVolumePlugins(logger klog.Logger, config persistentvolumecon } if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, &nfsConfig); err != nil { logger.Error(err, "Could not create NFS recycler pod from file", "path", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) + return nil, err } allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) diff --git a/cmd/kube-controller-manager/app/plugins_providers.go b/cmd/kube-controller-manager/app/plugins_providers.go index 9293c48f79d..e76167242d3 100644 --- a/cmd/kube-controller-manager/app/plugins_providers.go +++ b/cmd/kube-controller-manager/app/plugins_providers.go @@ -29,12 +29,10 @@ import ( type probeFn func() []volume.VolumePlugin func appendPluginBasedOnFeatureFlags(logger klog.Logger, plugins []volume.VolumePlugin, inTreePluginName string, featureGate featuregate.FeatureGate, pluginInfo pluginInfo) ([]volume.VolumePlugin, error) { - _, err := csimigration.CheckMigrationFeatureFlags(featureGate, pluginInfo.pluginMigrationFeature, pluginInfo.pluginUnregisterFeature) if err != nil { logger.Error(err, "Unexpected CSI Migration Feature Flags combination detected. CSI Migration may not take effect") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - // TODO: fail and return here once alpha only tests can set the feature flags for a plugin correctly + return nil, err } // Skip appending the in-tree plugin to the list of plugins to be probed/initialized diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index 0db44ba6e4a..7a6b7c004e3 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -21,32 +21,38 @@ package app import ( "context" - "k8s.io/client-go/dynamic" "k8s.io/client-go/scale" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/disruption" ) func newDisruptionControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.DisruptionController, - aliases: []string{"disruption"}, - initFunc: startDisruptionController, + name: names.DisruptionController, + aliases: []string{"disruption"}, + constructor: newDisruptionController, } } -func startDisruptionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - client := controllerContext.ClientBuilder.ClientOrDie("disruption-controller") - config := controllerContext.ClientBuilder.ConfigOrDie("disruption-controller") +func newDisruptionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("disruption-controller") + if err != nil { + return nil, err + } + + config, err := controllerContext.NewClientConfig("disruption-controller") + if err != nil { + return nil, err + } + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery()) scaleClient, err := scale.NewForConfig(config, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { - return nil, false, err + return nil, err } - go disruption.NewDisruptionController( + dc := disruption.NewDisruptionController( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(), @@ -58,6 +64,6 @@ func startDisruptionController(ctx context.Context, controllerContext Controller controllerContext.RESTMapper, scaleClient, client.Discovery(), - ).Run(ctx) - return nil, true, nil + ) + return newControllerLoop(dc.Run, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/rbac.go b/cmd/kube-controller-manager/app/rbac.go index c63c61987b4..465ddba6bba 100644 --- a/cmd/kube-controller-manager/app/rbac.go +++ b/cmd/kube-controller-manager/app/rbac.go @@ -19,23 +19,29 @@ package app import ( "context" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/clusterroleaggregation" ) func newClusterRoleAggregrationControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.ClusterRoleAggregationController, - aliases: []string{"clusterrole-aggregation"}, - initFunc: startClusterRoleAggregationController, + name: names.ClusterRoleAggregationController, + aliases: []string{"clusterrole-aggregation"}, + constructor: newClusterRoleAggregationController, } } -func startClusterRoleAggregationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - go clusterroleaggregation.NewClusterRoleAggregation( +func newClusterRoleAggregationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + client, err := controllerContext.NewClient("clusterrole-aggregation-controller") + if err != nil { + return nil, err + } + + crac := clusterroleaggregation.NewClusterRoleAggregation( controllerContext.InformerFactory.Rbac().V1().ClusterRoles(), - controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), - ).Run(ctx, 5) - return nil, true, nil + client.RbacV1(), + ) + return newControllerLoop(func(ctx context.Context) { + crac.Run(ctx, 5) + }, controllerName), nil } diff --git a/cmd/kube-controller-manager/app/service_accounts.go b/cmd/kube-controller-manager/app/service_accounts.go new file mode 100644 index 00000000000..aa5bd9d9409 --- /dev/null +++ b/cmd/kube-controller-manager/app/service_accounts.go @@ -0,0 +1,98 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + + "k8s.io/client-go/util/keyutil" + "k8s.io/controller-manager/pkg/clientbuilder" + "k8s.io/klog/v2" + "k8s.io/kubernetes/cmd/kube-controller-manager/names" + serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/serviceaccount" +) + +// serviceAccountTokenController is special because it must run first to set up permissions for other controllers. +// It cannot use the "normal" client builder, so it tracks its own. +func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor { + return &ControllerDescriptor{ + name: names.ServiceAccountTokenController, + aliases: []string{"serviceaccount-token"}, + constructor: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + return newServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder) + }, + // This controller is started manually before any other controller. + requiresSpecialHandling: true, + } +} + +func newServiceAccountTokenController( + ctx context.Context, controllerContext ControllerContext, controllerName string, + rootClientBuilder clientbuilder.ControllerClientBuilder, +) (Controller, error) { + if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 { + klog.FromContext(ctx).Info("Controller is disabled because there is no private key", "controller", controllerName) + return nil, nil + } + + privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) + if err != nil { + return nil, fmt.Errorf("error reading key for service account token controller: %w", err) + } + + var rootCA []byte + if controllerContext.ComponentConfig.SAController.RootCAFile != "" { + if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil { + return nil, fmt.Errorf("error parsing root-ca-file at %s: %w", controllerContext.ComponentConfig.SAController.RootCAFile, err) + } + } else { + config, err := rootClientBuilder.Config("tokens-controller") + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client config for %q: %w", "tokens-controller", err) + } + rootCA = config.CAData + } + + client, err := rootClientBuilder.Client("tokens-controller") + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client for %q: %w", "tokens-controller", err) + } + + tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey) + if err != nil { + return nil, fmt.Errorf("failed to build token generator: %w", err) + } + tokenController, err := serviceaccountcontroller.NewTokensController( + klog.FromContext(ctx), + controllerContext.InformerFactory.Core().V1().ServiceAccounts(), + controllerContext.InformerFactory.Core().V1().Secrets(), + client, + serviceaccountcontroller.TokensControllerOptions{ + TokenGenerator: tokenGenerator, + RootCA: rootCA, + }, + ) + if err != nil { + return nil, fmt.Errorf("error creating Tokens controller: %w", err) + } + + return newControllerLoop(func(ctx context.Context) { + tokenController.Run(ctx, int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs)) + }, controllerName), nil +} diff --git a/cmd/kube-controller-manager/app/storageversionmigrator.go b/cmd/kube-controller-manager/app/storageversionmigrator.go index 4ff1b57cdbe..e7dc1b67ab4 100644 --- a/cmd/kube-controller-manager/app/storageversionmigrator.go +++ b/cmd/kube-controller-manager/app/storageversionmigrator.go @@ -20,82 +20,87 @@ import ( "context" "fmt" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" - "k8s.io/client-go/metadata" - "k8s.io/controller-manager/controller" - "k8s.io/kubernetes/cmd/kube-controller-manager/names" - "k8s.io/kubernetes/pkg/features" - - utilfeature "k8s.io/apiserver/pkg/util/feature" clientgofeaturegate "k8s.io/client-go/features" + "k8s.io/client-go/metadata" + "k8s.io/kubernetes/cmd/kube-controller-manager/names" svm "k8s.io/kubernetes/pkg/controller/storageversionmigrator" + "k8s.io/kubernetes/pkg/features" ) func newStorageVersionMigratorControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ - name: names.StorageVersionMigratorController, - aliases: []string{"svm"}, - initFunc: startSVMController, + name: names.StorageVersionMigratorController, + aliases: []string{"svm"}, + constructor: newSVMController, } } -func startSVMController( - ctx context.Context, - controllerContext ControllerContext, - controllerName string, -) (controller.Interface, bool, error) { +func newSVMController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionMigrator) || !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { - return nil, false, nil + return nil, nil } if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { - return nil, true, fmt.Errorf("storage version migrator requires garbage collector") + return nil, fmt.Errorf("storage version migrator requires garbage collector") } if !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { err := fmt.Errorf("storage version migrator requires the InOrderInformers feature gate to be enabled") - return nil, true, err + return nil, err } // svm controller can make a lot of requests during migration, keep it fast - config := controllerContext.ClientBuilder.ConfigOrDie(controllerName) + config, err := controllerContext.NewClientConfig(controllerName) + if err != nil { + return nil, err + } + config.QPS *= 20 config.Burst *= 100 - client := controllerContext.ClientBuilder.ClientOrDie(controllerName) + client, err := controllerContext.NewClient(controllerName) + if err != nil { + return nil, err + } + informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations() dynamicClient, err := dynamic.NewForConfig(config) if err != nil { - return nil, false, err + return nil, err } discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { - return nil, false, err + return nil, err } - go svm.NewResourceVersionController( - ctx, - client, - discoveryClient, - metadata.NewForConfigOrDie(config), - informer, - controllerContext.RESTMapper, - ).Run(ctx) + metaClient, err := metadata.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create metadata client for %s: %w", controllerName, err) + } - svmController := svm.NewSVMController( - ctx, - client, - dynamicClient, - informer, - controllerName, - controllerContext.RESTMapper, - controllerContext.GraphBuilder, - ) - go svmController.Run(ctx) - - return svmController, true, nil + return newControllerLoop(concurrentRun( + svm.NewSVMController( + ctx, + client, + dynamicClient, + informer, + controllerName, + controllerContext.RESTMapper, + controllerContext.GraphBuilder, + ).Run, + svm.NewResourceVersionController( + ctx, + client, + discoveryClient, + metaClient, + informer, + controllerContext.RESTMapper, + ).Run, + ), controllerName), nil } diff --git a/cmd/kube-controller-manager/app/validatingadmissionpolicystatus.go b/cmd/kube-controller-manager/app/validatingadmissionpolicystatus.go index ce6e58f693b..4ec1431658c 100644 --- a/cmd/kube-controller-manager/app/validatingadmissionpolicystatus.go +++ b/cmd/kube-controller-manager/app/validatingadmissionpolicystatus.go @@ -18,13 +18,13 @@ package app import ( "context" + "fmt" apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme" pluginvalidatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating" "k8s.io/apiserver/pkg/cel/openapi/resolver" k8sscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/component-base/featuregate" - "k8s.io/controller-manager/controller" "k8s.io/kubernetes/cmd/kube-controller-manager/names" "k8s.io/kubernetes/pkg/controller/validatingadmissionpolicystatus" "k8s.io/kubernetes/pkg/generated/openapi" @@ -33,27 +33,40 @@ import ( func newValidatingAdmissionPolicyStatusControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ name: names.ValidatingAdmissionPolicyStatusController, - initFunc: startValidatingAdmissionPolicyStatusController, + constructor: newValidatingAdmissionPolicyStatusController, requiredFeatureGates: []featuregate.Feature{}, } } -func startValidatingAdmissionPolicyStatusController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { - // KCM won't start the controller without the feature gate set. +func newValidatingAdmissionPolicyStatusController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) { + discoveryClient, err := controllerContext.ClientBuilder.DiscoveryClient(names.ValidatingAdmissionPolicyStatusController) + if err != nil { + return nil, fmt.Errorf("failed to create discovery client for %s: %w", controllerName, err) + } schemaResolver := resolver.NewDefinitionsSchemaResolver(openapi.GetOpenAPIDefinitions, k8sscheme.Scheme, apiextensionsscheme.Scheme). - Combine(&resolver.ClientDiscoveryResolver{Discovery: controllerContext.ClientBuilder.DiscoveryClientOrDie(names.ValidatingAdmissionPolicyStatusController)}) + Combine(&resolver.ClientDiscoveryResolver{Discovery: discoveryClient}) typeChecker := &pluginvalidatingadmissionpolicy.TypeChecker{ SchemaResolver: schemaResolver, RestMapper: controllerContext.RESTMapper, } + + client, err := controllerContext.NewClient(names.ValidatingAdmissionPolicyStatusController) + if err != nil { + return nil, err + } + c, err := validatingadmissionpolicystatus.NewController( controllerContext.InformerFactory.Admissionregistration().V1().ValidatingAdmissionPolicies(), - controllerContext.ClientBuilder.ClientOrDie(names.ValidatingAdmissionPolicyStatusController).AdmissionregistrationV1().ValidatingAdmissionPolicies(), + client.AdmissionregistrationV1().ValidatingAdmissionPolicies(), typeChecker, ) + if err != nil { + return nil, err + } - go c.Run(ctx, int(controllerContext.ComponentConfig.ValidatingAdmissionPolicyStatusController.ConcurrentPolicySyncs)) - return nil, true, err + return newControllerLoop(func(ctx context.Context) { + c.Run(ctx, int(controllerContext.ComponentConfig.ValidatingAdmissionPolicyStatusController.ConcurrentPolicySyncs)) + }, controllerName), nil }