From f18f0df7feb87b60e6e31ae08009ee4d433ff9e2 Mon Sep 17 00:00:00 2001 From: Michael Aspinwall Date: Tue, 24 Feb 2026 00:26:11 +0000 Subject: [PATCH] Add the ability for the replicaset controller to read its own writes --- pkg/controller/replicaset/metrics/metrics.go | 37 +++++++--- pkg/controller/replicaset/replica_set.go | 69 ++++++++++++++++++- .../replication/replication_controller.go | 5 ++ pkg/features/kube_features.go | 13 ++++ .../reference/feature_list.md | 1 + .../reference/versioned_feature_list.yaml | 6 ++ 6 files changed, 119 insertions(+), 12 deletions(-) diff --git a/pkg/controller/replicaset/metrics/metrics.go b/pkg/controller/replicaset/metrics/metrics.go index 8fe43fce23c..e4e43340ee3 100644 --- a/pkg/controller/replicaset/metrics/metrics.go +++ b/pkg/controller/replicaset/metrics/metrics.go @@ -22,19 +22,34 @@ import ( const ReplicaSetControllerSubsystem = "replicaset_controller" -var SortingDeletionAgeRatio = metrics.NewHistogram( - &metrics.HistogramOpts{ - Subsystem: ReplicaSetControllerSubsystem, - Name: "sorting_deletion_age_ratio", - Help: "The ratio of chosen deleted pod's ages to the current youngest pod's age (at the time). Should be <2. " + - "The intent of this metric is to measure the rough efficacy of the LogarithmicScaleDown feature gate's effect on " + - "the sorting (and deletion) of pods when a replicaset scales down. This only considers Ready pods when calculating and reporting.", - Buckets: metrics.ExponentialBuckets(0.25, 2, 6), - StabilityLevel: metrics.ALPHA, - }, +var ( + SortingDeletionAgeRatio = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: ReplicaSetControllerSubsystem, + Name: "sorting_deletion_age_ratio", + Help: "The ratio of chosen deleted pod's ages to the current youngest pod's age (at the time). Should be <2. " + + "The intent of this metric is to measure the rough efficacy of the LogarithmicScaleDown feature gate's effect on " + + "the sorting (and deletion) of pods when a replicaset scales down. This only considers Ready pods when calculating and reporting.", + Buckets: metrics.ExponentialBuckets(0.25, 2, 6), + StabilityLevel: metrics.ALPHA, + }, + ) + + ReplicaSetRequeueSkips = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: ReplicaSetControllerSubsystem, + Name: "stale_sync_skips_total", + Help: "Total number of ReplicaSet syncs skipped due to a stale watch cache.", + StabilityLevel: metrics.ALPHA, + }, + []string{"group", "resource"}, + ) ) // Register registers ReplicaSet controller metrics. func Register(registrationFunc func(metrics.Registerable) error) error { - return registrationFunc(SortingDeletionAgeRatio) + if err := registrationFunc(SortingDeletionAgeRatio); err != nil { + return err + } + return registrationFunc(ReplicaSetRequeueSkips) } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index d29bdca2480..38285949639 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -29,6 +29,7 @@ package replicaset import ( "context" + "errors" "fmt" "reflect" "sort" @@ -61,6 +62,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replicaset/metrics" + consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/clock" "k8s.io/utils/ptr" @@ -79,6 +81,17 @@ const ( controllerUIDIndex = "controllerUID" ) +var ( + replicaSetGroupResource = schema.GroupResource{ + Group: "apps", + Resource: "replicasets", + } + podGroupResource = schema.GroupResource{ + Group: "", + Resource: "pods", + } +) + // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored // in the system with actual running pods. type ReplicaSetController struct { @@ -120,6 +133,8 @@ type ReplicaSetController struct { clock clock.PassiveClock + consistencyStore consistencyutil.ConsistencyStore + // Controller specific features; see ReplicaSetControllerFeatures for details. controllerFeatures ReplicaSetControllerFeatures } @@ -143,6 +158,32 @@ func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.Repli if err := metrics.Register(legacyregistry.Register); err != nil { logger.Error(err, "unable to register metrics") } + + var consistencyStore consistencyutil.ConsistencyStore + var podWriteCallback func(pod *v1.Pod, rs *metav1.OwnerReference) + if utilfeature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyReplicaSet) { + consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{ + podGroupResource: podInformer.Informer().GetStore(), + replicaSetGroupResource: rsInformer.Informer().GetStore(), + }) + podWriteCallback = func(pod *v1.Pod, rs *metav1.OwnerReference) { + if rs == nil { + return + } + consistencyStore.WroteAt( + types.NamespacedName{ + Namespace: pod.Namespace, + Name: rs.Name, + }, + rs.UID, + podGroupResource, + pod.ResourceVersion, + ) + } + } else { + consistencyStore = consistencyutil.NewNoopConsistencyStore() + } + return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas, apps.SchemeGroupVersion.WithKind("ReplicaSet"), "replicaset_controller", @@ -150,16 +191,18 @@ func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.Repli controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}), + OnWrite: podWriteCallback, }, eventBroadcaster, DefaultReplicaSetControllerFeatures(), + consistencyStore, ) } // NewBaseController is the implementation of NewReplicaSetController with additional injected // parameters so that it can also serve as the implementation of NewReplicationController. func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, - gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster, controllerFeatures ReplicaSetControllerFeatures) *ReplicaSetController { + gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster, controllerFeatures ReplicaSetControllerFeatures, consistencyStore consistencyutil.ConsistencyStore) *ReplicaSetController { rsc := &ReplicaSetController{ GroupVersionKind: gvk, @@ -174,6 +217,7 @@ func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetIn ), clock: clock.RealClock{}, controllerFeatures: controllerFeatures, + consistencyStore: consistencyStore, } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -405,6 +449,10 @@ func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) { logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs)) + rsc.consistencyStore.Clear(types.NamespacedName{ + Namespace: rs.Namespace, + Name: rs.Name, + }, rs.UID) // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean rsc.expectations.DeleteExpectations(logger, key) @@ -715,9 +763,22 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) if err != nil { return err } + rsNamespacedName := types.NamespacedName{Namespace: namespace, Name: name} + if err := rsc.consistencyStore.EnsureReady(rsNamespacedName); err != nil { + var consistencyErr *consistencyutil.ConsistencyError + if errors.As(err, &consistencyErr) { + metrics.ReplicaSetRequeueSkips.WithLabelValues( + consistencyErr.GroupResource.Group, + consistencyErr.GroupResource.Resource, + ).Inc() + } + return err + } + rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if apierrors.IsNotFound(err) { logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key) + rsc.consistencyStore.Clear(rsNamespacedName, "") rsc.expectations.DeleteExpectations(logger, key) return nil } @@ -769,6 +830,12 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) // Returning an error causes a requeue without forcing a hotloop return err } + rsc.consistencyStore.WroteAt( + types.NamespacedName{Name: rs.Name, Namespace: rs.Namespace}, + rs.UID, + replicaSetGroupResource, + updatedRS.ResourceVersion, + ) if manageReplicasErr != nil { return manageReplicasErr } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index aa6902f747e..beb52b40554 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replicaset" + consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency" ) const ( @@ -68,6 +69,10 @@ func NewReplicationManager(ctx context.Context, podInformer coreinformers.PodInf // ReplicaSets do support this field, which is then propagated to Deployments for higher-level features. EnableStatusTerminatingReplicas: false, }, + // TODO: Replication controller does not currently support stale controller consistency. + // In order to support stale controller consistency, we would need to parameterize the metrics + // and resource types passed to the consistency store. + consistencyutil.NewNoopConsistencyStore(), ), } } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 503eb3d3f16..98bc2fce908 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -941,6 +941,13 @@ const ( // prior to running a reconcile on the same object. StaleControllerConsistencyJob featuregate.Feature = "StaleControllerConsistencyJob" + // owner: @michaelasp + // kep: http://kep.k8s.io/5647 + // + // Introduces the ability for the ReplicaSet controller to be able to read its writes + // prior to running a reconcile on the same object. + StaleControllerConsistencyReplicaSet featuregate.Feature = "StaleControllerConsistencyReplicaSet" + // owner: @liggitt // // Mitigates spurious statefulset rollouts due to controller revision comparison mismatches @@ -1791,6 +1798,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, }, + StaleControllerConsistencyReplicaSet: { + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, + }, + StatefulSetSemanticRevisionComparison: { // This is a mitigation for a 1.34 regression due to serialization differences that cannot be feature-gated, // so this mitigation should not auto-disable even if emulating versions prior to 1.34 with --emulation-version. @@ -2416,6 +2427,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature StaleControllerConsistencyJob: {featuregate.Feature(clientfeatures.AtomicFIFO)}, + StaleControllerConsistencyReplicaSet: {featuregate.Feature(clientfeatures.AtomicFIFO)}, + StatefulSetSemanticRevisionComparison: {}, StorageCapacityScoring: {}, diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 6f069b04ece..7cab7332940 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -180,6 +180,7 @@ | SizeBasedListCostEstimate | :ballot_box_with_check: 1.34+ | | | 1.34– | | | | [code](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StaleControllerConsistencyDaemonSet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StaleControllerConsistencyJob | :ballot_box_with_check: 1.36+ | | | 1.36– | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| StaleControllerConsistencyReplicaSet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyReplicaSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyReplicaSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StatefulSetSemanticRevisionComparison | :ballot_box_with_check: 1.0+ | | | 1.0– | | | | [code](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StorageCapacityScoring | | | 1.33– | | | | | [code](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StorageNamespaceIndex | :ballot_box_with_check: 1.30+ | | | 1.30–1.32 | | 1.33– | | [code](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index 07fc008a61e..9f0c4545e86 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1737,6 +1737,12 @@ lockToDefault: false preRelease: Beta version: "1.36" +- name: StaleControllerConsistencyReplicaSet + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: StatefulSetSemanticRevisionComparison versionedSpecs: - default: true