Add the ability for the job controller to read its own writes

This commit is contained in:
Michael Aspinwall 2026-02-24 00:46:26 +00:00
parent b5c9cada55
commit 61d0dd30fb
10 changed files with 158 additions and 10 deletions

View file

@ -557,7 +557,7 @@ func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespac
func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error {
pod, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
if err != nil && r.OnWrite != nil {
if err == nil && r.OnWrite != nil {
ownerRef := metav1.GetControllerOfNoCopy(pod)
r.OnWrite(pod, ownerRef)
}

View file

@ -378,13 +378,18 @@ func TestCreatePodsWithGenerateName(t *testing.T) {
defer testServer.Close()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}, ContentType: runtime.ContentTypeJSON}})
callbackCalled := false
podControl := RealPodControl{
KubeClient: clientset,
Recorder: &record.FakeRecorder{},
OnWrite: func(*v1.Pod, *metav1.OwnerReference) {
callbackCalled = true
},
}
err := test.podCreationFunc(podControl)
require.NoError(t, err, "unexpected error: %v", err)
assert.True(t, callbackCalled, "OnWrite callback was not called")
fakeHandler.ValidateRequest(t, "/api/v1/namespaces/default/pods", "POST", nil)
var actualPod = &v1.Pod{}
@ -396,6 +401,32 @@ func TestCreatePodsWithGenerateName(t *testing.T) {
}
}
func TestPatchPodCallbacks(t *testing.T) {
fakeClient := fake.NewSimpleClientset(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
},
},
)
wroteCallbackCalled := false
podControl := RealPodControl{
KubeClient: fakeClient,
Recorder: &record.FakeRecorder{},
OnWrite: func(pod *v1.Pod, ownerRef *metav1.OwnerReference) {
wroteCallbackCalled = true
},
}
err := podControl.PatchPod(context.TODO(), "", "non-existing-pod", []byte("{}"))
assert.False(t, wroteCallbackCalled, "OnWrite callback was called when not expected")
assert.True(t, apierrors.IsNotFound(err), "Expected not found error")
err = podControl.PatchPod(context.TODO(), "", "test-pod", []byte("{}"))
assert.True(t, wroteCallbackCalled, "OnWrite callback was not called")
assert.NoError(t, err, "Expected no error")
}
func TestDeletePodsAllowsMissing(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
podControl := RealPodControl{

View file

@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon/metrics"
"k8s.io/kubernetes/pkg/controller/daemon/util"
consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency"
"k8s.io/kubernetes/pkg/features"
)
@ -150,7 +151,7 @@ type DaemonSetsController struct {
failedPodsBackoff *flowcontrol.Backoff
consistencyStore util.ConsistencyStore
consistencyStore consistencyutil.ConsistencyStore
}
// NewDaemonSetsController creates a new DaemonSetsController
@ -165,14 +166,17 @@ func NewDaemonSetsController(
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
logger := klog.FromContext(ctx)
var consistencyStore util.ConsistencyStore
var consistencyStore consistencyutil.ConsistencyStore
var podWriteCallback func(pod *v1.Pod, ds *metav1.OwnerReference)
if utilfeature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyDaemonSet) {
consistencyStore = util.NewConsistencyStore(map[schema.GroupResource]util.LastSyncRVGetter{
consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{
podGroupResource: podInformer.Informer().GetStore(),
daemonsetGroupResource: daemonSetInformer.Informer().GetStore(),
})
podWriteCallback = func(pod *v1.Pod, ds *metav1.OwnerReference) {
if ds == nil {
return
}
consistencyStore.WroteAt(
types.NamespacedName{
Namespace: pod.Namespace,
@ -184,7 +188,7 @@ func NewDaemonSetsController(
)
}
} else {
consistencyStore = util.NewNoopConsistencyStore()
consistencyStore = consistencyutil.NewNoopConsistencyStore()
}
dsc := &DaemonSetsController{
@ -1282,7 +1286,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
// continue since our reads will not be in sync with our previously enacted
// state.
if err := dsc.consistencyStore.EnsureReady(dsNamespacedName); err != nil {
var consistencyErr *util.ConsistencyError
var consistencyErr *consistencyutil.ConsistencyError
if errors.As(err, &consistencyErr) {
metrics.DaemonsetRequeueSkips.WithLabelValues(
consistencyErr.GroupResource.Group,

View file

@ -18,6 +18,7 @@ package job
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
@ -30,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -51,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/job/util"
consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
@ -130,6 +133,9 @@ type Controller struct {
// finishedJobExpectations contains the job ids for which the job status is finished
// but the corresponding event is not yet received.
finishedJobExpectations sync.Map
// consistencyStore stores information about the consistency of the job.
consistencyStore consistencyutil.ConsistencyStore
}
type syncJobCtx struct {
@ -167,6 +173,17 @@ type orphanPodKey struct {
value string
}
var (
jobGroupResource = schema.GroupResource{
Group: "batch",
Resource: "jobs",
}
podGroupResource = schema.GroupResource{
Group: "",
Resource: "pods",
}
)
// NewController creates a new Job controller that keeps the relevant pods
// in sync with their corresponding Job objects.
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) {
@ -177,11 +194,37 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
logger := klog.FromContext(ctx)
var consistencyStore consistencyutil.ConsistencyStore
var podWriteCallback func(pod *v1.Pod, ds *metav1.OwnerReference)
if feature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyJob) {
consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{
podGroupResource: podInformer.Informer().GetStore(),
jobGroupResource: jobInformer.Informer().GetStore(),
})
podWriteCallback = func(pod *v1.Pod, job *metav1.OwnerReference) {
if job == nil {
return
}
consistencyStore.WroteAt(
types.NamespacedName{
Namespace: pod.Namespace,
Name: job.Name,
},
job.UID,
podGroupResource,
pod.ResourceVersion,
)
}
} else {
consistencyStore = consistencyutil.NewNoopConsistencyStore()
}
jm := &Controller{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
OnWrite: podWriteCallback,
},
expectations: controller.NewControllerExpectations(),
finalizerExpectations: newUIDTrackingExpectations(),
@ -192,6 +235,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
clock: clock,
podBackoffStore: newBackoffStore(),
finishedJobExpectations: sync.Map{},
consistencyStore: consistencyStore,
}
if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -832,12 +876,28 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if len(ns) == 0 || len(name) == 0 {
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
jobNamespacedName := types.NamespacedName{Namespace: ns, Name: name}
// If our writes have not yet been observed by our informers, we should not
// continue since our reads will not be in sync with our previously enacted
// state.
if err := jm.consistencyStore.EnsureReady(jobNamespacedName); err != nil {
var consistencyErr *consistencyutil.ConsistencyError
if errors.As(err, &consistencyErr) {
metrics.JobRequeueSkips.WithLabelValues(
consistencyErr.GroupResource.Group,
consistencyErr.GroupResource.Resource,
).Inc()
}
return err
}
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(4).Info("Job has been deleted", "key", key)
jm.expectations.DeleteExpectations(logger, key)
jm.finalizerExpectations.deleteExpectations(logger, key)
jm.consistencyStore.Clear(jobNamespacedName, "")
err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil {
@ -1889,12 +1949,31 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P
// updateJobStatus calls the API to update the job status.
func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{})
job, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
jm.consistencyStore.WroteAt(
types.NamespacedName{Name: job.Name, Namespace: job.Namespace},
job.UID,
jobGroupResource,
job.ResourceVersion,
)
return job, err
}
func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error {
_, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch(
job, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch(
ctx, job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
if err != nil {
return err
}
jm.consistencyStore.WroteAt(
types.NamespacedName{Name: job.Name, Namespace: job.Namespace},
job.UID,
jobGroupResource,
job.ResourceVersion,
)
return err
}

View file

@ -159,6 +159,19 @@ Possible values of the "reason" label are:
Possible values of the "status" label are:
"succeeded", "failed".`,
}, []string{"reason", "status"})
// JobRequeueSkips track the number of job syncs skipped due to a stale
// watch cache.
JobRequeueSkips = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "stale_sync_skips_total",
Help: "Total number of Job syncs skipped due to a stale watch cache.",
StabilityLevel: metrics.ALPHA,
},
// These are the labels (dimensions)
[]string{"group", "resource"},
)
)
const (
@ -212,5 +225,6 @@ func Register() {
legacyregistry.MustRegister(JobFinishedIndexesTotal)
legacyregistry.MustRegister(JobPodsCreationTotal)
legacyregistry.MustRegister(JobByExternalControllerTotal)
legacyregistry.MustRegister(JobRequeueSkips)
})
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package consistency
import (
"fmt"

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package consistency
import (
"sync"

View file

@ -942,6 +942,13 @@ const (
// prior to running a reconcile on the same object.
StaleControllerConsistencyDaemonSet featuregate.Feature = "StaleControllerConsistencyDaemonSet"
// owner: @michaelasp
// kep: http://kep.k8s.io/5647
//
// Introduces the ability for the Job controller to be able to read its writes
// prior to running a reconcile on the same object.
StaleControllerConsistencyJob featuregate.Feature = "StaleControllerConsistencyJob"
// owner: @liggitt
//
// Mitigates spurious statefulset rollouts due to controller revision comparison mismatches
@ -1786,6 +1793,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
StaleControllerConsistencyJob: {
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
StatefulSetSemanticRevisionComparison: {
// This is a mitigation for a 1.34 regression due to serialization differences that cannot be feature-gated,
// so this mitigation should not auto-disable even if emulating versions prior to 1.34 with --emulation-version.
@ -2407,6 +2418,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
StaleControllerConsistencyDaemonSet: {featuregate.Feature(clientfeatures.AtomicFIFO)},
StaleControllerConsistencyJob: {featuregate.Feature(clientfeatures.AtomicFIFO)},
StatefulSetSemanticRevisionComparison: {},
StorageCapacityScoring: {},

View file

@ -180,6 +180,7 @@
| SidecarContainers | :ballot_box_with_check: 1.29+ | :closed_lock_with_key: 1.33+ | 1.28 | 1.291.32 | 1.33 | | | [code](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| SizeBasedListCostEstimate | :ballot_box_with_check: 1.34+ | | | 1.34 | | | | [code](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StaleControllerConsistencyDaemonSet | :ballot_box_with_check: 1.36+ | | | 1.36 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StaleControllerConsistencyJob | :ballot_box_with_check: 1.36+ | | | 1.36 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StatefulSetSemanticRevisionComparison | :ballot_box_with_check: 1.0+ | | | 1.0 | | | | [code](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StorageCapacityScoring | | | 1.33 | | | | | [code](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StorageNamespaceIndex | :ballot_box_with_check: 1.30+ | | | 1.301.32 | | 1.33 | | [code](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |

View file

@ -1741,6 +1741,12 @@
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: StaleControllerConsistencyJob
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: StatefulSetSemanticRevisionComparison
versionedSpecs:
- default: true