diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index c6c3b31f0d9..e3421f30c5e 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -482,6 +482,7 @@ type PodControlInterface interface { type RealPodControl struct { KubeClient clientset.Interface Recorder record.EventRecorder + OnWrite func(*v1.Pod, *metav1.OwnerReference) } var _ PodControlInterface = &RealPodControl{} @@ -551,11 +552,15 @@ func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespac if len(generateName) > 0 { pod.ObjectMeta.GenerateName = generateName } - return r.createPods(ctx, namespace, pod, controllerObject) + return r.createPods(ctx, namespace, pod, controllerObject, controllerRef) } func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error { - _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) + pod, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) + if err != nil && r.OnWrite != nil { + ownerRef := metav1.GetControllerOfNoCopy(pod) + r.OnWrite(pod, ownerRef) + } return err } @@ -584,7 +589,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec return pod, nil } -func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error { +func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object, controllerRef *metav1.OwnerReference) error { if len(labels.Set(pod.Labels)) == 0 { return fmt.Errorf("unable to create pods, no labels") } @@ -597,6 +602,9 @@ func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v return err } logger := klog.FromContext(ctx) + if r.OnWrite != nil { + r.OnWrite(newPod, controllerRef) + } accessor, err := meta.Accessor(object) if err != nil { logger.Error(err, "parentObject does not have ObjectMeta") diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 91c9dcdc3b6..04d6e334c12 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -18,6 +18,7 @@ package daemon import ( "context" + "errors" "fmt" "reflect" "sort" @@ -30,6 +31,8 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -51,6 +54,7 @@ import ( "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/daemon/metrics" "k8s.io/kubernetes/pkg/controller/daemon/util" "k8s.io/kubernetes/pkg/features" ) @@ -79,6 +83,17 @@ const ( SucceededDaemonPodReason = "SucceededDaemonPod" ) +var ( + daemonsetGroupResource = schema.GroupResource{ + Group: "apps", + Resource: "daemonsets", + } + podGroupResource = schema.GroupResource{ + Group: "", + Resource: "pods", + } +) + // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet") @@ -134,6 +149,8 @@ type DaemonSetsController struct { nodeUpdateQueue workqueue.TypedRateLimitingInterface[string] failedPodsBackoff *flowcontrol.Backoff + + consistencyStore util.ConsistencyStore } // NewDaemonSetsController creates a new DaemonSetsController @@ -148,6 +165,28 @@ func NewDaemonSetsController( ) (*DaemonSetsController, error) { eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) logger := klog.FromContext(ctx) + var consistencyStore util.ConsistencyStore + var podWriteCallback func(pod *v1.Pod, ds *metav1.OwnerReference) + if utilfeature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyDaemonSet) { + consistencyStore = util.NewConsistencyStore(map[schema.GroupResource]util.LastSyncRVGetter{ + podGroupResource: podInformer.Informer().GetStore(), + daemonsetGroupResource: daemonSetInformer.Informer().GetStore(), + }) + podWriteCallback = func(pod *v1.Pod, ds *metav1.OwnerReference) { + consistencyStore.WroteAt( + types.NamespacedName{ + Namespace: pod.Namespace, + Name: ds.Name, + }, + ds.UID, + podGroupResource, + pod.ResourceVersion, + ) + } + } else { + consistencyStore = util.NewNoopConsistencyStore() + } + dsc := &DaemonSetsController{ kubeClient: kubeClient, eventBroadcaster: eventBroadcaster, @@ -155,7 +194,9 @@ func NewDaemonSetsController( podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}), + OnWrite: podWriteCallback, }, + crControl: controller.RealControllerRevisionControl{ KubeClient: kubeClient, }, @@ -173,6 +214,7 @@ func NewDaemonSetsController( Name: "daemonset-node-updates", }, ), + consistencyStore: consistencyStore, } daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -205,6 +247,7 @@ func NewDaemonSetsController( // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { dsc.addPod(logger, obj) @@ -239,6 +282,7 @@ func NewDaemonSetsController( dsc.failedPodsBackoff = failedPodsBackoff + metrics.Register() return dsc, nil } @@ -290,6 +334,13 @@ func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interfa utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ds, err)) return } + dsc.consistencyStore.Clear( + types.NamespacedName{ + Name: ds.Name, + Namespace: ds.Namespace, + }, + ds.UID, + ) // Delete expectations for the DaemonSet so if we create a new one with the same name it starts clean dsc.expectations.DeleteExpectations(logger, key) @@ -1095,7 +1146,7 @@ func storeDaemonSetStatus( updatedNumberScheduled, numberAvailable, numberUnavailable int, - updateObservedGen bool) error { + updateObservedGen bool) (*apps.DaemonSet, error) { if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled && int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled && int(ds.Status.NumberMisscheduled) == numberMisscheduled && @@ -1104,7 +1155,7 @@ func storeDaemonSetStatus( int(ds.Status.NumberAvailable) == numberAvailable && int(ds.Status.NumberUnavailable) == numberUnavailable && ds.Status.ObservedGeneration >= ds.Generation { - return nil + return nil, nil } toUpdate := ds.DeepCopy() @@ -1122,8 +1173,9 @@ func storeDaemonSetStatus( toUpdate.Status.NumberAvailable = int32(numberAvailable) toUpdate.Status.NumberUnavailable = int32(numberUnavailable) - if _, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil { - return nil + var result *apps.DaemonSet + if result, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil { + return result, nil } // Stop retrying if we exceed statusUpdateRetries - the DaemonSet will be requeued with a rate limit. @@ -1134,10 +1186,10 @@ func storeDaemonSetStatus( if toUpdate, getErr = dsClient.Get(ctx, ds.Name, metav1.GetOptions{}); getErr != nil { // If the GET fails we can't trust status.Replicas anymore. This error // is bound to be more interesting than the update failure. - return getErr + return nil, getErr } } - return updateErr + return nil, updateErr } func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error { @@ -1188,10 +1240,18 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds * } numberUnavailable := desiredNumberScheduled - numberAvailable - err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen) + updatedDS, err := storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen) if err != nil { return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err) } + if updatedDS != nil { + dsc.consistencyStore.WroteAt( + types.NamespacedName{Name: ds.Name, Namespace: ds.Namespace}, + updatedDS.UID, + daemonsetGroupResource, + updatedDS.ResourceVersion, + ) + } // Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew. if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable { @@ -1212,10 +1272,31 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) if err != nil { return err } + + dsNamespacedName := types.NamespacedName{ + Namespace: namespace, + Name: name, + } + + // If our writes have not yet been observed by our informers, we should not + // continue since our reads will not be in sync with our previously enacted + // state. + if err := dsc.consistencyStore.EnsureReady(dsNamespacedName); err != nil { + var consistencyErr *util.ConsistencyError + if errors.As(err, &consistencyErr) { + metrics.DaemonsetRequeueSkips.WithLabelValues( + consistencyErr.GroupResource.Group, + consistencyErr.GroupResource.Resource, + ).Inc() + } + return err + } + ds, err := dsc.dsLister.DaemonSets(namespace).Get(name) if apierrors.IsNotFound(err) { logger.V(3).Info("Daemon set has been deleted", "daemonset", key) dsc.expectations.DeleteExpectations(logger, key) + dsc.consistencyStore.Clear(dsNamespacedName, "") return nil } if err != nil { diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 221ee3b9acb..4e3dc28c391 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -3621,7 +3621,7 @@ func TestStoreDaemonSetStatus(t *testing.T) { } return true, ds, nil }) - if err := storeDaemonSetStatus(context.TODO(), fakeClient.AppsV1().DaemonSets("default"), ds, 2, 2, 2, 2, 2, 2, 2, true); err != tt.expectedError { + if _, err := storeDaemonSetStatus(context.TODO(), fakeClient.AppsV1().DaemonSets("default"), ds, 2, 2, 2, 2, 2, 2, 2, true); !errors.Is(err, tt.expectedError) { t.Errorf("storeDaemonSetStatus() got %v, expected %v", err, tt.expectedError) } if getCalled != tt.expectedGetCalled { diff --git a/pkg/controller/daemon/metrics/metrics.go b/pkg/controller/daemon/metrics/metrics.go new file mode 100644 index 00000000000..ba76c764436 --- /dev/null +++ b/pkg/controller/daemon/metrics/metrics.go @@ -0,0 +1,48 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const DaemonControllerSubsystem = "daemonset_controller" + +var ( + DaemonsetRequeueSkips = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: DaemonControllerSubsystem, + Name: "stale_sync_skips_total", + Help: "Total number of DaemonSet syncs skipped due to a stale watch cache.", + StabilityLevel: metrics.ALPHA, + }, + // These are the labels (dimensions) + []string{"group", "resource"}, + ) +) + +var registerMetrics sync.Once + +// Register registers DaemonSet Controller metrics. +func Register() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(DaemonsetRequeueSkips) + }) +} diff --git a/pkg/controller/daemon/util/consistency.go b/pkg/controller/daemon/util/consistency.go new file mode 100644 index 00000000000..12b7403cfe0 --- /dev/null +++ b/pkg/controller/daemon/util/consistency.go @@ -0,0 +1,217 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/resourceversion" +) + +type ConsistencyStore interface { + // WroteAt records a written RV for an owned resource. + WroteAt(owner types.NamespacedName, ownerUID types.UID, resource schema.GroupResource, rv string) + // Clear wipes the owner if the UID matches, if left empty it will wipe no + // matter what the UID is. + Clear(owner types.NamespacedName, ownerUID types.UID) + // EnsureReady queries the ConsistencyStore to check whether or not the + // stores records are up to date, returning an error if they are not. + EnsureReady(owner types.NamespacedName) error +} + +// ConsistencyError is an error type returned by EnsureReady with information +// about the resource versions and GroupKind that caused the error. +type ConsistencyError struct { + ReadRV string + WroteRV string + GroupResource schema.GroupResource +} + +func (c *ConsistencyError) Error() string { + return fmt.Sprintf("read version: %s is not as new as written version: %s for group resource %s", c.ReadRV, c.WroteRV, c.GroupResource.String()) +} + +var _ ConsistencyStore = &RealConsistencyStore{} + +type LastSyncRVGetter interface { + LastStoreSyncResourceVersion() string +} + +type RealConsistencyStore struct { + // writesLock guards reads/additions/deletions to the writes map. + // individual records are responsible for managing their own thread safety. + writesLock sync.RWMutex + // writes is a map of owner -> ownerRecord + writes map[types.NamespacedName]*ownerRecord + + stores map[schema.GroupResource]LastSyncRVGetter +} + +func NewConsistencyStore(stores map[schema.GroupResource]LastSyncRVGetter) *RealConsistencyStore { + return &RealConsistencyStore{ + writes: map[types.NamespacedName]*ownerRecord{}, + stores: stores, + } +} + +// getWrittenRecord returns the record for the given owner, or nil if no record exists. +func (c *RealConsistencyStore) getWrittenRecord(owner types.NamespacedName) *ownerRecord { + c.writesLock.RLock() + defer c.writesLock.RUnlock() + return c.writes[owner] +} + +// ensureWrittenRecord returns a ownerRecord for the given owner and ownerUID. +// If there is no current record, one is created. +// If there is a current record with a different ownerUID, it is replaced with an empty record for the specified ownerUID. +func (c *RealConsistencyStore) ensureWrittenRecord(owner types.NamespacedName, ownerUID types.UID) *ownerRecord { + // fast path, already exists + if record := c.getWrittenRecord(owner); record != nil && record.ownerUID == ownerUID { + return record + } + + // slow path, init + c.writesLock.Lock() + defer c.writesLock.Unlock() + // check again after write lock + if record := c.writes[owner]; record != nil && record.ownerUID == ownerUID { + return record + } + // initialize to the given uid + record := newOwnerRecord(ownerUID) + c.writes[owner] = record + return record +} + +// WroteAt writes the latest written RV if it is greater than the currently +// written RV for the owner. +func (c *RealConsistencyStore) WroteAt(owner types.NamespacedName, ownerUID types.UID, resource schema.GroupResource, rv string) { + c.ensureWrittenRecord(owner, ownerUID).WroteAt(resource, rv) +} + +// Clear deletes the record for owner if it exists and matches the specified +// ownerUID (or the specified ownerUID is empty) +func (c *RealConsistencyStore) Clear(owner types.NamespacedName, ownerUID types.UID) { + // deleted owners typically have an existing record, not worth checking the fast path for missing records + c.writesLock.Lock() + defer c.writesLock.Unlock() + if record := c.writes[owner]; record != nil && (len(ownerUID) == 0 || record.ownerUID == ownerUID) { + delete(c.writes, owner) + } +} + +// EnsureReady returns nil if observed resource versions are at least as new as +// any recorded versions for the given owner, otherwise returning the error of +// what happened. Must not be called concurrent with WroteAt for the same owner. +func (c *RealConsistencyStore) EnsureReady(owner types.NamespacedName) error { + record := c.getWrittenRecord(owner) + if record == nil { + return nil + } + err := record.EnsureReady(c) + if err == nil { + c.Clear(owner, record.ownerUID) + return nil + } + return err +} + +type ownerRecord struct { + // ownerUID must not be mutated after creation + ownerUID types.UID + + versionsLock sync.Mutex + versions map[schema.GroupResource]string +} + +func newOwnerRecord(ownerUID types.UID) *ownerRecord { + return &ownerRecord{ownerUID: ownerUID, versions: map[schema.GroupResource]string{}} +} + +// WroteAt increments the written resource version of an ownerRecord if it is +// the newest seen resource version for that resource. +func (w *ownerRecord) WroteAt(resource schema.GroupResource, rv string) { + w.versionsLock.Lock() + defer w.versionsLock.Unlock() + if _, ok := w.versions[resource]; !ok { + w.versions[resource] = rv + return + } + cmp, err := resourceversion.CompareResourceVersion(w.versions[resource], rv) + if err == nil && cmp >= 0 { + return + } + w.versions[resource] = rv +} + +// EnsureReady checks whether or not the ownerRecord is ready compared to the +// read resource versions in the consistency store. +func (w *ownerRecord) EnsureReady(c *RealConsistencyStore) error { + w.versionsLock.Lock() + defer w.versionsLock.Unlock() + for gr, wroteRV := range w.versions { + store, exists := c.stores[gr] + if !exists || store == nil { + continue + } + readRV := store.LastStoreSyncResourceVersion() + if readRV == "" { + // Since we wait for the store to be ready, the only time "" is if the + // LastStoreSyncResourceVersion() feature is not enabled. + continue + } + i, err := resourceversion.CompareResourceVersion(wroteRV, readRV) + if err != nil { + // comparison errors indicate there's a data problem with resource versions, continue so we don't block syncing + continue + } + if i > 0 { + // read version is not as new as owner version, not ready + return &ConsistencyError{ + WroteRV: wroteRV, + ReadRV: readRV, + GroupResource: gr, + } + } + } + return nil +} + +// NoopConsistencyStore is a consistency store that stores nothing and always +// returns IsReady as true. To be used when the associated feature gate is not +// enabled. +type NoopConsistencyStore struct{} + +var _ ConsistencyStore = &NoopConsistencyStore{} + +func (*NoopConsistencyStore) WroteAt(owner types.NamespacedName, ownerUID types.UID, resource schema.GroupResource, rv string) { +} + +func (*NoopConsistencyStore) ReadAt(resource schema.GroupResource, rv string) {} + +func (*NoopConsistencyStore) Clear(owner types.NamespacedName, ownerUID types.UID) {} + +func (*NoopConsistencyStore) EnsureReady(owner types.NamespacedName) error { + return nil +} + +func NewNoopConsistencyStore() *NoopConsistencyStore { + return &NoopConsistencyStore{} +} diff --git a/pkg/controller/daemon/util/consistency_test.go b/pkg/controller/daemon/util/consistency_test.go new file mode 100644 index 00000000000..6915145ba62 --- /dev/null +++ b/pkg/controller/daemon/util/consistency_test.go @@ -0,0 +1,292 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" +) + +func TestOwnerRecord_WroteAt(t *testing.T) { + uid := types.UID("owner-uid-1") + or := newOwnerRecord(uid) + assert.Equal(t, uid, or.ownerUID) + require.NotNil(t, or.versions) + + grPod := schema.GroupResource{Group: "", Resource: "pods"} + grDs := schema.GroupResource{Group: "apps", Resource: "daemonsets"} + + // First write + or.WroteAt(grPod, "5") + assert.Equal(t, "5", or.versions[grPod]) + + // Second write (higher) + or.WroteAt(grPod, "10") + assert.Equal(t, "10", or.versions[grPod]) + + // Third write (lower) + or.WroteAt(grPod, "8") + assert.Equal(t, "10", or.versions[grPod]) + + // Write to different resource + or.WroteAt(grDs, "1") + assert.Equal(t, "1", or.versions[grDs]) + assert.Equal(t, "10", or.versions[grPod], "Pod version should be unchanged") +} + +func TestOwnerRecord_IsReady(t *testing.T) { + uid := types.UID("owner-uid-1") + or := newOwnerRecord(uid) + grPod := schema.GroupResource{Group: "", Resource: "pods"} + grDs := schema.GroupResource{Group: "apps", Resource: "daemonsets"} + podStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + dsStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + resourceStores := map[schema.GroupResource]LastSyncRVGetter{ + grPod: podStore, + grDs: dsStore, + } + + store := NewConsistencyStore(resourceStores) + + // Case 1: No writes. Should be ready. + require.NoError(t, or.EnsureReady(store), "Should be ready if no writes are recorded") + + // Add a write + or.WroteAt(grPod, "10") + + // Case 2: Write exists, but no reads. Should stay ready. + require.NoError(t, or.EnsureReady(store), "Should stay ready if write exists and no read") + + // Add a read, but it's lower + podStore.Bookmark("5") + + // Case 3: Write exists, read is lower. Not ready. + require.Error(t, or.EnsureReady(store), "Not ready if read < write") + + // Add a read, equal + podStore.Bookmark("10") + + // Case 4: Write exists, read is equal. Ready. + require.NoError(t, or.EnsureReady(store), "Ready if read == write") + + // Add a read, higher + podStore.Bookmark("15") + + // Case 5: Write exists, read is higher. Ready. + require.NoError(t, or.EnsureReady(store), "Ready if read > write") + + // Add a second write and read + or.WroteAt(grDs, "100") + dsStore.Bookmark("50") + + // Case 6: One resource ready, one not. Not ready. + require.Error(t, or.EnsureReady(store), "Not ready if one of multiple writes is not ready (no read)") + + // Make the second one ready + dsStore.Bookmark("100") + + // Case 7: All resources ready. Ready. + require.NoError(t, or.EnsureReady(store), "Ready if all writes are ready") +} + +func TestConsistencyStore_New(t *testing.T) { + store := NewConsistencyStore(nil) + require.NotNil(t, store) + require.NotNil(t, store.writes) + assert.Empty(t, store.writes) +} + +func TestConsistencyStore_EnsureWrittenRecord(t *testing.T) { + store := NewConsistencyStore(nil) + owner := types.NamespacedName{Name: "owner1"} + uid1 := types.UID("uid-1") + uid2 := types.UID("uid-2") + + // Create new + r1 := store.ensureWrittenRecord(owner, uid1) + require.NotNil(t, r1) + assert.Equal(t, uid1, r1.ownerUID) + assert.Same(t, r1, store.writes[owner]) + + // Get existing with same UID + r2 := store.ensureWrittenRecord(owner, uid1) + assert.Same(t, r1, r2, "Should return existing record for same UID") + + // Get existing with different UID (should replace) + r3 := store.ensureWrittenRecord(owner, uid2) + require.NotNil(t, r3) + assert.NotSame(t, r1, r3, "Should be a new record") + assert.Equal(t, uid2, r3.ownerUID) + assert.Same(t, r3, store.writes[owner], "New record should replace old one in map") + assert.Empty(t, r3.versions, "New record should be empty") + + // Check that old record is detached + grPod := schema.GroupResource{Group: "", Resource: "pods"} + r1.WroteAt(grPod, "10") // Write to old record + assert.Empty(t, r3.versions, "Write to old record should not affect new record") +} + +func TestConsistencyStore_EnsureWrittenRecord_Concurrent(t *testing.T) { + store := NewConsistencyStore(nil) + owner := types.NamespacedName{Name: "owner1"} + uid1 := types.UID("uid-1") + uid2 := types.UID("uid-2") + + wg := sync.WaitGroup{} + numGoroutines := 50 + + // Concurrent creation with same UID + var firstRecord *ownerRecord + var once sync.Once + for range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + r := store.ensureWrittenRecord(owner, uid1) + assert.Equal(t, uid1, r.ownerUID) + once.Do(func() { + firstRecord = r + }) + assert.Same(t, firstRecord, r) + }() + } + wg.Wait() + require.NotNil(t, firstRecord) + assert.Len(t, store.writes, 1) + + // Concurrent replacement with new UID + var replacementRecord *ownerRecord + var replaceOnce sync.Once + for range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + r := store.ensureWrittenRecord(owner, uid2) + assert.Equal(t, uid2, r.ownerUID) + replaceOnce.Do(func() { + replacementRecord = r + }) + assert.Same(t, replacementRecord, r) + }() + } + wg.Wait() + require.NotNil(t, replacementRecord) + assert.Len(t, store.writes, 1) + assert.Same(t, replacementRecord, store.writes[owner]) + assert.NotSame(t, firstRecord, replacementRecord) +} + +func TestConsistencyStore_WroteAt(t *testing.T) { + store := NewConsistencyStore(nil) + owner := types.NamespacedName{Name: "owner1"} + uid1 := types.UID("uid-1") + grPod := schema.GroupResource{Group: "", Resource: "pods"} + + store.WroteAt(owner, uid1, grPod, "10") + + record := store.getWrittenRecord(owner) + require.NotNil(t, record) + assert.Equal(t, uid1, record.ownerUID) + + assert.Equal(t, "10", record.versions[grPod]) + + // Write again + store.WroteAt(owner, uid1, grPod, "20") + assert.Equal(t, "20", record.versions[grPod]) +} + +func TestConsistencyStore_Clear(t *testing.T) { + store := NewConsistencyStore(nil) + owner1 := types.NamespacedName{Name: "owner1"} + owner2 := types.NamespacedName{Name: "owner2"} + uid1 := types.UID("uid-1") + uid2 := types.UID("uid-2") + + // Setup + r1 := store.ensureWrittenRecord(owner1, uid1) + r2 := store.ensureWrittenRecord(owner2, uid2) + require.Len(t, store.writes, 2) + + // Clear non-existent + store.Clear(types.NamespacedName{Name: "non-existent"}, uid1) + assert.Len(t, store.writes, 2) + + // Clear with wrong UID + store.Clear(owner1, uid2) + assert.Len(t, store.writes, 2, "Should not clear with wrong UID") + assert.Same(t, r1, store.writes[owner1]) + + // Clear with correct UID + store.Clear(owner1, uid1) + assert.Len(t, store.writes, 1, "Should clear with correct UID") + assert.Nil(t, store.writes[owner1]) + assert.Same(t, r2, store.writes[owner2], "Other record should remain") + + // Re-add r1 + store.ensureWrittenRecord(owner1, uid1) + require.Len(t, store.writes, 2) + + // Clear with empty UID + store.Clear(owner1, "") + assert.Len(t, store.writes, 1, "Should clear with empty UID") + assert.Nil(t, store.writes[owner1]) + assert.Same(t, r2, store.writes[owner2]) +} + +func TestConsistencyStore_IsReady(t *testing.T) { + owner1 := types.NamespacedName{Name: "owner1"} + uid1 := types.UID("uid-1") + grPod := schema.GroupResource{Group: "", Resource: "pods"} + podStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + resourceStores := map[schema.GroupResource]LastSyncRVGetter{ + grPod: podStore, + } + + store := NewConsistencyStore(resourceStores) + + // Case 1: No record. Ready. + require.NoError(t, store.EnsureReady(owner1), "Ready if no record exists") + + // Add a write and initial read rv + podStore.Bookmark("5") + store.WroteAt(owner1, uid1, grPod, "10") + + // Case 2: Record exists, read < write. Not ready. + require.Error(t, store.EnsureReady(owner1), "Not ready if read < write") + + // Add read, equal + podStore.Bookmark("10") + + // Case 3: Record exists, read == write. Ready. + require.NoError(t, store.EnsureReady(owner1), "Ready if read == write") + + // Add read, higher + podStore.Bookmark("15") + + // Case 4: Record exists, read > write. Ready. + require.NoError(t, store.EnsureReady(owner1), "Ready if read > write") + + // Assert that the record no longer exists, we no longer need to track the + // reads as long as the read has been higher than the latest write. + assert.Nil(t, store.getWrittenRecord(owner1), "Written record should no longer exist") +} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 4b1a33350ec..facc83c9da8 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -935,6 +935,13 @@ const ( // pod's lifecycle and will not block pod termination. SidecarContainers featuregate.Feature = "SidecarContainers" + // owner: @michaelasp + // kep: http://kep.k8s.io/5647 + // + // Introduces the ability for the DaemonSet controller to be able to read its writes + // prior to running a reconcile on the same object. + StaleControllerConsistencyDaemonSet featuregate.Feature = "StaleControllerConsistencyDaemonSet" + // owner: @liggitt // // Mitigates spurious statefulset rollouts due to controller revision comparison mismatches @@ -1766,6 +1773,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.33"), Default: true, LockToDefault: true, PreRelease: featuregate.GA}, // GA in 1.33 remove in 1.36 }, + StaleControllerConsistencyDaemonSet: { + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, + }, + StatefulSetSemanticRevisionComparison: { // This is a mitigation for a 1.34 regression due to serialization differences that cannot be feature-gated, // so this mitigation should not auto-disable even if emulating versions prior to 1.34 with --emulation-version. @@ -2374,6 +2385,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature SidecarContainers: {}, + StaleControllerConsistencyDaemonSet: {featuregate.Feature(clientfeatures.AtomicFIFO)}, + StatefulSetSemanticRevisionComparison: {}, StorageCapacityScoring: {}, @@ -2508,10 +2521,6 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature } func init() { - runtime.Must(utilfeature.DefaultMutableFeatureGate.AddVersioned(defaultVersionedKubernetesFeatureGates)) - runtime.Must(utilfeature.DefaultMutableFeatureGate.AddDependencies(defaultKubernetesFeatureGateDependencies)) - runtime.Must(zpagesfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate)) - // Register all client-go features with kube's feature gate instance and make all client-go // feature checks use kube's instance. The effect is that for kube binaries, client-go // features are wired to the existing --feature-gates flag just as all other features @@ -2520,4 +2529,8 @@ func init() { ca := &clientAdapter{utilfeature.DefaultMutableFeatureGate} runtime.Must(clientfeatures.AddVersionedFeaturesToExistingFeatureGates(ca)) clientfeatures.ReplaceFeatureGates(ca) + + runtime.Must(utilfeature.DefaultMutableFeatureGate.AddVersioned(defaultVersionedKubernetesFeatureGates)) + runtime.Must(utilfeature.DefaultMutableFeatureGate.AddDependencies(defaultKubernetesFeatureGateDependencies)) + runtime.Must(zpagesfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate)) } diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 382cdab24c4..21859ba03f5 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -178,6 +178,7 @@ | ServiceAccountTokenPodNodeInfo | :ballot_box_with_check: 1.30+ | :closed_lock_with_key: 1.32+ | 1.29 | 1.30–1.31 | 1.32– | | | [code](https://cs.k8s.io/?q=%5CbServiceAccountTokenPodNodeInfo%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbServiceAccountTokenPodNodeInfo%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | SidecarContainers | :ballot_box_with_check: 1.29+ | :closed_lock_with_key: 1.33+ | 1.28 | 1.29–1.32 | 1.33– | | | [code](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | SizeBasedListCostEstimate | :ballot_box_with_check: 1.34+ | | | 1.34– | | | | [code](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| StaleControllerConsistencyDaemonSet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StatefulSetSemanticRevisionComparison | :ballot_box_with_check: 1.0+ | | | 1.0– | | | | [code](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StorageCapacityScoring | | | 1.33– | | | | | [code](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | StorageNamespaceIndex | :ballot_box_with_check: 1.30+ | | | 1.30–1.32 | | 1.33– | | [code](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index 5c8ec11af22..1923998bc4c 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1709,6 +1709,12 @@ lockToDefault: false preRelease: Beta version: "1.34" +- name: StaleControllerConsistencyDaemonSet + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: StatefulSetSemanticRevisionComparison versionedSpecs: - default: true