diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e3421f30c5e..dd5268ae5a9 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -557,7 +557,7 @@ func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespac func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error { pod, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) - if err != nil && r.OnWrite != nil { + if err == nil && r.OnWrite != nil { ownerRef := metav1.GetControllerOfNoCopy(pod) r.OnWrite(pod, ownerRef) } diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 63a7e2768ae..fb8b858388c 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -378,13 +378,18 @@ func TestCreatePodsWithGenerateName(t *testing.T) { defer testServer.Close() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}, ContentType: runtime.ContentTypeJSON}}) + callbackCalled := false podControl := RealPodControl{ KubeClient: clientset, Recorder: &record.FakeRecorder{}, + OnWrite: func(*v1.Pod, *metav1.OwnerReference) { + callbackCalled = true + }, } err := test.podCreationFunc(podControl) require.NoError(t, err, "unexpected error: %v", err) + assert.True(t, callbackCalled, "OnWrite callback was not called") fakeHandler.ValidateRequest(t, "/api/v1/namespaces/default/pods", "POST", nil) var actualPod = &v1.Pod{} @@ -396,6 +401,32 @@ func TestCreatePodsWithGenerateName(t *testing.T) { } } +func TestPatchPodCallbacks(t *testing.T) { + fakeClient := fake.NewSimpleClientset( + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + }, + }, + ) + wroteCallbackCalled := false + podControl := RealPodControl{ + KubeClient: fakeClient, + Recorder: &record.FakeRecorder{}, + OnWrite: func(pod *v1.Pod, ownerRef *metav1.OwnerReference) { + wroteCallbackCalled = true + }, + } + + err := podControl.PatchPod(context.TODO(), "", "non-existing-pod", []byte("{}")) + assert.False(t, wroteCallbackCalled, "OnWrite callback was called when not expected") + assert.True(t, apierrors.IsNotFound(err), "Expected not found error") + + err = podControl.PatchPod(context.TODO(), "", "test-pod", []byte("{}")) + assert.True(t, wroteCallbackCalled, "OnWrite callback was not called") + assert.NoError(t, err, "Expected no error") +} + func TestDeletePodsAllowsMissing(t *testing.T) { fakeClient := fake.NewSimpleClientset() podControl := RealPodControl{ diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 04d6e334c12..1f399c1787e 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -56,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon/metrics" "k8s.io/kubernetes/pkg/controller/daemon/util" + consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency" "k8s.io/kubernetes/pkg/features" ) @@ -150,7 +151,7 @@ type DaemonSetsController struct { failedPodsBackoff *flowcontrol.Backoff - consistencyStore util.ConsistencyStore + consistencyStore consistencyutil.ConsistencyStore } // NewDaemonSetsController creates a new DaemonSetsController @@ -165,14 +166,17 @@ func NewDaemonSetsController( ) (*DaemonSetsController, error) { eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) logger := klog.FromContext(ctx) - var consistencyStore util.ConsistencyStore + var consistencyStore consistencyutil.ConsistencyStore var podWriteCallback func(pod *v1.Pod, ds *metav1.OwnerReference) if utilfeature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyDaemonSet) { - consistencyStore = util.NewConsistencyStore(map[schema.GroupResource]util.LastSyncRVGetter{ + consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{ podGroupResource: podInformer.Informer().GetStore(), daemonsetGroupResource: daemonSetInformer.Informer().GetStore(), }) podWriteCallback = func(pod *v1.Pod, ds *metav1.OwnerReference) { + if ds == nil { + return + } consistencyStore.WroteAt( types.NamespacedName{ Namespace: pod.Namespace, @@ -184,7 +188,7 @@ func NewDaemonSetsController( ) } } else { - consistencyStore = util.NewNoopConsistencyStore() + consistencyStore = consistencyutil.NewNoopConsistencyStore() } dsc := &DaemonSetsController{ @@ -1282,7 +1286,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) // continue since our reads will not be in sync with our previously enacted // state. if err := dsc.consistencyStore.EnsureReady(dsNamespacedName); err != nil { - var consistencyErr *util.ConsistencyError + var consistencyErr *consistencyutil.ConsistencyError if errors.As(err, &consistencyErr) { metrics.DaemonsetRequeueSkips.WithLabelValues( consistencyErr.GroupResource.Group, diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1ace380fc12..5e90c84b4f4 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -18,6 +18,7 @@ package job import ( "context" + "errors" "fmt" "reflect" "sort" @@ -30,6 +31,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -51,6 +53,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/job/util" + consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/clock" "k8s.io/utils/ptr" @@ -130,6 +133,9 @@ type Controller struct { // finishedJobExpectations contains the job ids for which the job status is finished // but the corresponding event is not yet received. finishedJobExpectations sync.Map + + // consistencyStore stores information about the consistency of the job. + consistencyStore consistencyutil.ConsistencyStore } type syncJobCtx struct { @@ -167,6 +173,17 @@ type orphanPodKey struct { value string } +var ( + jobGroupResource = schema.GroupResource{ + Group: "batch", + Resource: "jobs", + } + podGroupResource = schema.GroupResource{ + Group: "", + Resource: "pods", + } +) + // NewController creates a new Job controller that keeps the relevant pods // in sync with their corresponding Job objects. func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) { @@ -177,11 +194,37 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) logger := klog.FromContext(ctx) + var consistencyStore consistencyutil.ConsistencyStore + var podWriteCallback func(pod *v1.Pod, ds *metav1.OwnerReference) + if feature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyJob) { + consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{ + podGroupResource: podInformer.Informer().GetStore(), + jobGroupResource: jobInformer.Informer().GetStore(), + }) + podWriteCallback = func(pod *v1.Pod, job *metav1.OwnerReference) { + if job == nil { + return + } + consistencyStore.WroteAt( + types.NamespacedName{ + Namespace: pod.Namespace, + Name: job.Name, + }, + job.UID, + podGroupResource, + pod.ResourceVersion, + ) + } + } else { + consistencyStore = consistencyutil.NewNoopConsistencyStore() + } + jm := &Controller{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), + OnWrite: podWriteCallback, }, expectations: controller.NewControllerExpectations(), finalizerExpectations: newUIDTrackingExpectations(), @@ -192,6 +235,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn clock: clock, podBackoffStore: newBackoffStore(), finishedJobExpectations: sync.Map{}, + consistencyStore: consistencyStore, } if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -832,12 +876,28 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if len(ns) == 0 || len(name) == 0 { return fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } + jobNamespacedName := types.NamespacedName{Namespace: ns, Name: name} + // If our writes have not yet been observed by our informers, we should not + // continue since our reads will not be in sync with our previously enacted + // state. + if err := jm.consistencyStore.EnsureReady(jobNamespacedName); err != nil { + var consistencyErr *consistencyutil.ConsistencyError + if errors.As(err, &consistencyErr) { + metrics.JobRequeueSkips.WithLabelValues( + consistencyErr.GroupResource.Group, + consistencyErr.GroupResource.Resource, + ).Inc() + } + return err + } + sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if apierrors.IsNotFound(err) { logger.V(4).Info("Job has been deleted", "key", key) jm.expectations.DeleteExpectations(logger, key) jm.finalizerExpectations.deleteExpectations(logger, key) + jm.consistencyStore.Clear(jobNamespacedName, "") err := jm.podBackoffStore.removeBackoffRecord(key) if err != nil { @@ -1889,12 +1949,31 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P // updateJobStatus calls the API to update the job status. func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) { - return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{}) + job, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + jm.consistencyStore.WroteAt( + types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, + job.UID, + jobGroupResource, + job.ResourceVersion, + ) + return job, err } func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error { - _, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch( + job, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch( ctx, job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) + if err != nil { + return err + } + jm.consistencyStore.WroteAt( + types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, + job.UID, + jobGroupResource, + job.ResourceVersion, + ) return err } diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index a52067ada7f..2d59ae5aa3f 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -159,6 +159,19 @@ Possible values of the "reason" label are: Possible values of the "status" label are: "succeeded", "failed".`, }, []string{"reason", "status"}) + + // JobRequeueSkips track the number of job syncs skipped due to a stale + // watch cache. + JobRequeueSkips = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "stale_sync_skips_total", + Help: "Total number of Job syncs skipped due to a stale watch cache.", + StabilityLevel: metrics.ALPHA, + }, + // These are the labels (dimensions) + []string{"group", "resource"}, + ) ) const ( @@ -212,5 +225,6 @@ func Register() { legacyregistry.MustRegister(JobFinishedIndexesTotal) legacyregistry.MustRegister(JobPodsCreationTotal) legacyregistry.MustRegister(JobByExternalControllerTotal) + legacyregistry.MustRegister(JobRequeueSkips) }) } diff --git a/pkg/controller/daemon/util/consistency.go b/pkg/controller/util/consistency/consistency.go similarity index 99% rename from pkg/controller/daemon/util/consistency.go rename to pkg/controller/util/consistency/consistency.go index 12b7403cfe0..7c727fb21b2 100644 --- a/pkg/controller/daemon/util/consistency.go +++ b/pkg/controller/util/consistency/consistency.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package consistency import ( "fmt" diff --git a/pkg/controller/daemon/util/consistency_test.go b/pkg/controller/util/consistency/consistency_test.go similarity index 99% rename from pkg/controller/daemon/util/consistency_test.go rename to pkg/controller/util/consistency/consistency_test.go index 6915145ba62..114152da375 100644 --- a/pkg/controller/daemon/util/consistency_test.go +++ b/pkg/controller/util/consistency/consistency_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package consistency import ( "sync" diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index fd89103983f..66419ad100e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -942,6 +942,13 @@ const ( // prior to running a reconcile on the same object. StaleControllerConsistencyDaemonSet featuregate.Feature = "StaleControllerConsistencyDaemonSet" + // owner: @michaelasp + // kep: http://kep.k8s.io/5647 + // + // Introduces the ability for the Job controller to be able to read its writes + // prior to running a reconcile on the same object. + StaleControllerConsistencyJob featuregate.Feature = "StaleControllerConsistencyJob" + // owner: @liggitt // // Mitigates spurious statefulset rollouts due to controller revision comparison mismatches @@ -1786,6 +1793,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, }, + StaleControllerConsistencyJob: { + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, + }, + StatefulSetSemanticRevisionComparison: { // This is a mitigation for a 1.34 regression due to serialization differences that cannot be feature-gated, // so this mitigation should not auto-disable even if emulating versions prior to 1.34 with --emulation-version. @@ -2407,6 +2418,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature StaleControllerConsistencyDaemonSet: {featuregate.Feature(clientfeatures.AtomicFIFO)}, + StaleControllerConsistencyJob: {featuregate.Feature(clientfeatures.AtomicFIFO)}, + StatefulSetSemanticRevisionComparison: {}, StorageCapacityScoring: {}, diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 76af12ac2d1..b47afa57b35 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -180,6 +180,7 @@ | SidecarContainers | :ballot_box_with_check: 1.29+ | :closed_lock_with_key: 1.33+ | 1.28 | 1.29–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | SizeBasedListCostEstimate | :ballot_box_with_check: 1.34+ | | | 1.34– | | | | [code](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StaleControllerConsistencyDaemonSet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| StaleControllerConsistencyJob | :ballot_box_with_check: 1.36+ | | | 1.36– | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StatefulSetSemanticRevisionComparison | :ballot_box_with_check: 1.0+ | | | 1.0– | | | | [code](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StorageCapacityScoring | | | 1.33– | | | | | [code](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StorageNamespaceIndex | :ballot_box_with_check: 1.30+ | | | 1.30–1.32 | | 1.33– | | [code](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index ed916ccbb12..a02f00dfe3f 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1741,6 +1741,12 @@ lockToDefault: false preRelease: Beta version: "1.36" +- name: StaleControllerConsistencyJob + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: StatefulSetSemanticRevisionComparison versionedSpecs: - default: true