Daemonset Consistency

Add the ability for the daemonset controller to figure out whether it has read its own writes for pods and daemonset objects.
This commit is contained in:
Michael Aspinwall 2025-10-31 16:47:28 +00:00
parent b72e248c25
commit 65eb0e94c2
9 changed files with 681 additions and 15 deletions

View file

@ -482,6 +482,7 @@ type PodControlInterface interface {
type RealPodControl struct {
KubeClient clientset.Interface
Recorder record.EventRecorder
OnWrite func(*v1.Pod, *metav1.OwnerReference)
}
var _ PodControlInterface = &RealPodControl{}
@ -551,11 +552,15 @@ func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespac
if len(generateName) > 0 {
pod.ObjectMeta.GenerateName = generateName
}
return r.createPods(ctx, namespace, pod, controllerObject)
return r.createPods(ctx, namespace, pod, controllerObject, controllerRef)
}
func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error {
_, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
pod, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
if err != nil && r.OnWrite != nil {
ownerRef := metav1.GetControllerOfNoCopy(pod)
r.OnWrite(pod, ownerRef)
}
return err
}
@ -584,7 +589,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec
return pod, nil
}
func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error {
func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object, controllerRef *metav1.OwnerReference) error {
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
@ -597,6 +602,9 @@ func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v
return err
}
logger := klog.FromContext(ctx)
if r.OnWrite != nil {
r.OnWrite(newPod, controllerRef)
}
accessor, err := meta.Accessor(object)
if err != nil {
logger.Error(err, "parentObject does not have ObjectMeta")

View file

@ -18,6 +18,7 @@ package daemon
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
@ -30,6 +31,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -51,6 +54,7 @@ import (
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon/metrics"
"k8s.io/kubernetes/pkg/controller/daemon/util"
"k8s.io/kubernetes/pkg/features"
)
@ -79,6 +83,17 @@ const (
SucceededDaemonPodReason = "SucceededDaemonPod"
)
var (
daemonsetGroupResource = schema.GroupResource{
Group: "apps",
Resource: "daemonsets",
}
podGroupResource = schema.GroupResource{
Group: "",
Resource: "pods",
}
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
@ -134,6 +149,8 @@ type DaemonSetsController struct {
nodeUpdateQueue workqueue.TypedRateLimitingInterface[string]
failedPodsBackoff *flowcontrol.Backoff
consistencyStore util.ConsistencyStore
}
// NewDaemonSetsController creates a new DaemonSetsController
@ -148,6 +165,28 @@ func NewDaemonSetsController(
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
logger := klog.FromContext(ctx)
var consistencyStore util.ConsistencyStore
var podWriteCallback func(pod *v1.Pod, ds *metav1.OwnerReference)
if utilfeature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyDaemonSet) {
consistencyStore = util.NewConsistencyStore(map[schema.GroupResource]util.LastSyncRVGetter{
podGroupResource: podInformer.Informer().GetStore(),
daemonsetGroupResource: daemonSetInformer.Informer().GetStore(),
})
podWriteCallback = func(pod *v1.Pod, ds *metav1.OwnerReference) {
consistencyStore.WroteAt(
types.NamespacedName{
Namespace: pod.Namespace,
Name: ds.Name,
},
ds.UID,
podGroupResource,
pod.ResourceVersion,
)
}
} else {
consistencyStore = util.NewNoopConsistencyStore()
}
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventBroadcaster: eventBroadcaster,
@ -155,7 +194,9 @@ func NewDaemonSetsController(
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
OnWrite: podWriteCallback,
},
crControl: controller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
@ -173,6 +214,7 @@ func NewDaemonSetsController(
Name: "daemonset-node-updates",
},
),
consistencyStore: consistencyStore,
}
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -205,6 +247,7 @@ func NewDaemonSetsController(
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dsc.addPod(logger, obj)
@ -239,6 +282,7 @@ func NewDaemonSetsController(
dsc.failedPodsBackoff = failedPodsBackoff
metrics.Register()
return dsc, nil
}
@ -290,6 +334,13 @@ func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interfa
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ds, err))
return
}
dsc.consistencyStore.Clear(
types.NamespacedName{
Name: ds.Name,
Namespace: ds.Namespace,
},
ds.UID,
)
// Delete expectations for the DaemonSet so if we create a new one with the same name it starts clean
dsc.expectations.DeleteExpectations(logger, key)
@ -1095,7 +1146,7 @@ func storeDaemonSetStatus(
updatedNumberScheduled,
numberAvailable,
numberUnavailable int,
updateObservedGen bool) error {
updateObservedGen bool) (*apps.DaemonSet, error) {
if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
@ -1104,7 +1155,7 @@ func storeDaemonSetStatus(
int(ds.Status.NumberAvailable) == numberAvailable &&
int(ds.Status.NumberUnavailable) == numberUnavailable &&
ds.Status.ObservedGeneration >= ds.Generation {
return nil
return nil, nil
}
toUpdate := ds.DeepCopy()
@ -1122,8 +1173,9 @@ func storeDaemonSetStatus(
toUpdate.Status.NumberAvailable = int32(numberAvailable)
toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
if _, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil {
return nil
var result *apps.DaemonSet
if result, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil {
return result, nil
}
// Stop retrying if we exceed statusUpdateRetries - the DaemonSet will be requeued with a rate limit.
@ -1134,10 +1186,10 @@ func storeDaemonSetStatus(
if toUpdate, getErr = dsClient.Get(ctx, ds.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
return nil, getErr
}
}
return updateErr
return nil, updateErr
}
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
@ -1188,10 +1240,18 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *
}
numberUnavailable := desiredNumberScheduled - numberAvailable
err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
updatedDS, err := storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
if err != nil {
return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
}
if updatedDS != nil {
dsc.consistencyStore.WroteAt(
types.NamespacedName{Name: ds.Name, Namespace: ds.Namespace},
updatedDS.UID,
daemonsetGroupResource,
updatedDS.ResourceVersion,
)
}
// Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
@ -1212,10 +1272,31 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
if err != nil {
return err
}
dsNamespacedName := types.NamespacedName{
Namespace: namespace,
Name: name,
}
// If our writes have not yet been observed by our informers, we should not
// continue since our reads will not be in sync with our previously enacted
// state.
if err := dsc.consistencyStore.EnsureReady(dsNamespacedName); err != nil {
var consistencyErr *util.ConsistencyError
if errors.As(err, &consistencyErr) {
metrics.DaemonsetRequeueSkips.WithLabelValues(
consistencyErr.GroupResource.Group,
consistencyErr.GroupResource.Resource,
).Inc()
}
return err
}
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
dsc.expectations.DeleteExpectations(logger, key)
dsc.consistencyStore.Clear(dsNamespacedName, "")
return nil
}
if err != nil {

View file

@ -3621,7 +3621,7 @@ func TestStoreDaemonSetStatus(t *testing.T) {
}
return true, ds, nil
})
if err := storeDaemonSetStatus(context.TODO(), fakeClient.AppsV1().DaemonSets("default"), ds, 2, 2, 2, 2, 2, 2, 2, true); err != tt.expectedError {
if _, err := storeDaemonSetStatus(context.TODO(), fakeClient.AppsV1().DaemonSets("default"), ds, 2, 2, 2, 2, 2, 2, 2, true); !errors.Is(err, tt.expectedError) {
t.Errorf("storeDaemonSetStatus() got %v, expected %v", err, tt.expectedError)
}
if getCalled != tt.expectedGetCalled {

View file

@ -0,0 +1,48 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const DaemonControllerSubsystem = "daemonset_controller"
var (
DaemonsetRequeueSkips = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: DaemonControllerSubsystem,
Name: "stale_sync_skips_total",
Help: "Total number of DaemonSet syncs skipped due to a stale watch cache.",
StabilityLevel: metrics.ALPHA,
},
// These are the labels (dimensions)
[]string{"group", "resource"},
)
)
var registerMetrics sync.Once
// Register registers DaemonSet Controller metrics.
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(DaemonsetRequeueSkips)
})
}

View file

@ -0,0 +1,217 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/resourceversion"
)
type ConsistencyStore interface {
// WroteAt records a written RV for an owned resource.
WroteAt(owner types.NamespacedName, ownerUID types.UID, resource schema.GroupResource, rv string)
// Clear wipes the owner if the UID matches, if left empty it will wipe no
// matter what the UID is.
Clear(owner types.NamespacedName, ownerUID types.UID)
// EnsureReady queries the ConsistencyStore to check whether or not the
// stores records are up to date, returning an error if they are not.
EnsureReady(owner types.NamespacedName) error
}
// ConsistencyError is an error type returned by EnsureReady with information
// about the resource versions and GroupKind that caused the error.
type ConsistencyError struct {
ReadRV string
WroteRV string
GroupResource schema.GroupResource
}
func (c *ConsistencyError) Error() string {
return fmt.Sprintf("read version: %s is not as new as written version: %s for group resource %s", c.ReadRV, c.WroteRV, c.GroupResource.String())
}
var _ ConsistencyStore = &RealConsistencyStore{}
type LastSyncRVGetter interface {
LastStoreSyncResourceVersion() string
}
type RealConsistencyStore struct {
// writesLock guards reads/additions/deletions to the writes map.
// individual records are responsible for managing their own thread safety.
writesLock sync.RWMutex
// writes is a map of owner -> ownerRecord
writes map[types.NamespacedName]*ownerRecord
stores map[schema.GroupResource]LastSyncRVGetter
}
func NewConsistencyStore(stores map[schema.GroupResource]LastSyncRVGetter) *RealConsistencyStore {
return &RealConsistencyStore{
writes: map[types.NamespacedName]*ownerRecord{},
stores: stores,
}
}
// getWrittenRecord returns the record for the given owner, or nil if no record exists.
func (c *RealConsistencyStore) getWrittenRecord(owner types.NamespacedName) *ownerRecord {
c.writesLock.RLock()
defer c.writesLock.RUnlock()
return c.writes[owner]
}
// ensureWrittenRecord returns a ownerRecord for the given owner and ownerUID.
// If there is no current record, one is created.
// If there is a current record with a different ownerUID, it is replaced with an empty record for the specified ownerUID.
func (c *RealConsistencyStore) ensureWrittenRecord(owner types.NamespacedName, ownerUID types.UID) *ownerRecord {
// fast path, already exists
if record := c.getWrittenRecord(owner); record != nil && record.ownerUID == ownerUID {
return record
}
// slow path, init
c.writesLock.Lock()
defer c.writesLock.Unlock()
// check again after write lock
if record := c.writes[owner]; record != nil && record.ownerUID == ownerUID {
return record
}
// initialize to the given uid
record := newOwnerRecord(ownerUID)
c.writes[owner] = record
return record
}
// WroteAt writes the latest written RV if it is greater than the currently
// written RV for the owner.
func (c *RealConsistencyStore) WroteAt(owner types.NamespacedName, ownerUID types.UID, resource schema.GroupResource, rv string) {
c.ensureWrittenRecord(owner, ownerUID).WroteAt(resource, rv)
}
// Clear deletes the record for owner if it exists and matches the specified
// ownerUID (or the specified ownerUID is empty)
func (c *RealConsistencyStore) Clear(owner types.NamespacedName, ownerUID types.UID) {
// deleted owners typically have an existing record, not worth checking the fast path for missing records
c.writesLock.Lock()
defer c.writesLock.Unlock()
if record := c.writes[owner]; record != nil && (len(ownerUID) == 0 || record.ownerUID == ownerUID) {
delete(c.writes, owner)
}
}
// EnsureReady returns nil if observed resource versions are at least as new as
// any recorded versions for the given owner, otherwise returning the error of
// what happened. Must not be called concurrent with WroteAt for the same owner.
func (c *RealConsistencyStore) EnsureReady(owner types.NamespacedName) error {
record := c.getWrittenRecord(owner)
if record == nil {
return nil
}
err := record.EnsureReady(c)
if err == nil {
c.Clear(owner, record.ownerUID)
return nil
}
return err
}
type ownerRecord struct {
// ownerUID must not be mutated after creation
ownerUID types.UID
versionsLock sync.Mutex
versions map[schema.GroupResource]string
}
func newOwnerRecord(ownerUID types.UID) *ownerRecord {
return &ownerRecord{ownerUID: ownerUID, versions: map[schema.GroupResource]string{}}
}
// WroteAt increments the written resource version of an ownerRecord if it is
// the newest seen resource version for that resource.
func (w *ownerRecord) WroteAt(resource schema.GroupResource, rv string) {
w.versionsLock.Lock()
defer w.versionsLock.Unlock()
if _, ok := w.versions[resource]; !ok {
w.versions[resource] = rv
return
}
cmp, err := resourceversion.CompareResourceVersion(w.versions[resource], rv)
if err == nil && cmp >= 0 {
return
}
w.versions[resource] = rv
}
// EnsureReady checks whether or not the ownerRecord is ready compared to the
// read resource versions in the consistency store.
func (w *ownerRecord) EnsureReady(c *RealConsistencyStore) error {
w.versionsLock.Lock()
defer w.versionsLock.Unlock()
for gr, wroteRV := range w.versions {
store, exists := c.stores[gr]
if !exists || store == nil {
continue
}
readRV := store.LastStoreSyncResourceVersion()
if readRV == "" {
// Since we wait for the store to be ready, the only time "" is if the
// LastStoreSyncResourceVersion() feature is not enabled.
continue
}
i, err := resourceversion.CompareResourceVersion(wroteRV, readRV)
if err != nil {
// comparison errors indicate there's a data problem with resource versions, continue so we don't block syncing
continue
}
if i > 0 {
// read version is not as new as owner version, not ready
return &ConsistencyError{
WroteRV: wroteRV,
ReadRV: readRV,
GroupResource: gr,
}
}
}
return nil
}
// NoopConsistencyStore is a consistency store that stores nothing and always
// returns IsReady as true. To be used when the associated feature gate is not
// enabled.
type NoopConsistencyStore struct{}
var _ ConsistencyStore = &NoopConsistencyStore{}
func (*NoopConsistencyStore) WroteAt(owner types.NamespacedName, ownerUID types.UID, resource schema.GroupResource, rv string) {
}
func (*NoopConsistencyStore) ReadAt(resource schema.GroupResource, rv string) {}
func (*NoopConsistencyStore) Clear(owner types.NamespacedName, ownerUID types.UID) {}
func (*NoopConsistencyStore) EnsureReady(owner types.NamespacedName) error {
return nil
}
func NewNoopConsistencyStore() *NoopConsistencyStore {
return &NoopConsistencyStore{}
}

View file

@ -0,0 +1,292 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
)
func TestOwnerRecord_WroteAt(t *testing.T) {
uid := types.UID("owner-uid-1")
or := newOwnerRecord(uid)
assert.Equal(t, uid, or.ownerUID)
require.NotNil(t, or.versions)
grPod := schema.GroupResource{Group: "", Resource: "pods"}
grDs := schema.GroupResource{Group: "apps", Resource: "daemonsets"}
// First write
or.WroteAt(grPod, "5")
assert.Equal(t, "5", or.versions[grPod])
// Second write (higher)
or.WroteAt(grPod, "10")
assert.Equal(t, "10", or.versions[grPod])
// Third write (lower)
or.WroteAt(grPod, "8")
assert.Equal(t, "10", or.versions[grPod])
// Write to different resource
or.WroteAt(grDs, "1")
assert.Equal(t, "1", or.versions[grDs])
assert.Equal(t, "10", or.versions[grPod], "Pod version should be unchanged")
}
func TestOwnerRecord_IsReady(t *testing.T) {
uid := types.UID("owner-uid-1")
or := newOwnerRecord(uid)
grPod := schema.GroupResource{Group: "", Resource: "pods"}
grDs := schema.GroupResource{Group: "apps", Resource: "daemonsets"}
podStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
dsStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
resourceStores := map[schema.GroupResource]LastSyncRVGetter{
grPod: podStore,
grDs: dsStore,
}
store := NewConsistencyStore(resourceStores)
// Case 1: No writes. Should be ready.
require.NoError(t, or.EnsureReady(store), "Should be ready if no writes are recorded")
// Add a write
or.WroteAt(grPod, "10")
// Case 2: Write exists, but no reads. Should stay ready.
require.NoError(t, or.EnsureReady(store), "Should stay ready if write exists and no read")
// Add a read, but it's lower
podStore.Bookmark("5")
// Case 3: Write exists, read is lower. Not ready.
require.Error(t, or.EnsureReady(store), "Not ready if read < write")
// Add a read, equal
podStore.Bookmark("10")
// Case 4: Write exists, read is equal. Ready.
require.NoError(t, or.EnsureReady(store), "Ready if read == write")
// Add a read, higher
podStore.Bookmark("15")
// Case 5: Write exists, read is higher. Ready.
require.NoError(t, or.EnsureReady(store), "Ready if read > write")
// Add a second write and read
or.WroteAt(grDs, "100")
dsStore.Bookmark("50")
// Case 6: One resource ready, one not. Not ready.
require.Error(t, or.EnsureReady(store), "Not ready if one of multiple writes is not ready (no read)")
// Make the second one ready
dsStore.Bookmark("100")
// Case 7: All resources ready. Ready.
require.NoError(t, or.EnsureReady(store), "Ready if all writes are ready")
}
func TestConsistencyStore_New(t *testing.T) {
store := NewConsistencyStore(nil)
require.NotNil(t, store)
require.NotNil(t, store.writes)
assert.Empty(t, store.writes)
}
func TestConsistencyStore_EnsureWrittenRecord(t *testing.T) {
store := NewConsistencyStore(nil)
owner := types.NamespacedName{Name: "owner1"}
uid1 := types.UID("uid-1")
uid2 := types.UID("uid-2")
// Create new
r1 := store.ensureWrittenRecord(owner, uid1)
require.NotNil(t, r1)
assert.Equal(t, uid1, r1.ownerUID)
assert.Same(t, r1, store.writes[owner])
// Get existing with same UID
r2 := store.ensureWrittenRecord(owner, uid1)
assert.Same(t, r1, r2, "Should return existing record for same UID")
// Get existing with different UID (should replace)
r3 := store.ensureWrittenRecord(owner, uid2)
require.NotNil(t, r3)
assert.NotSame(t, r1, r3, "Should be a new record")
assert.Equal(t, uid2, r3.ownerUID)
assert.Same(t, r3, store.writes[owner], "New record should replace old one in map")
assert.Empty(t, r3.versions, "New record should be empty")
// Check that old record is detached
grPod := schema.GroupResource{Group: "", Resource: "pods"}
r1.WroteAt(grPod, "10") // Write to old record
assert.Empty(t, r3.versions, "Write to old record should not affect new record")
}
func TestConsistencyStore_EnsureWrittenRecord_Concurrent(t *testing.T) {
store := NewConsistencyStore(nil)
owner := types.NamespacedName{Name: "owner1"}
uid1 := types.UID("uid-1")
uid2 := types.UID("uid-2")
wg := sync.WaitGroup{}
numGoroutines := 50
// Concurrent creation with same UID
var firstRecord *ownerRecord
var once sync.Once
for range numGoroutines {
wg.Add(1)
go func() {
defer wg.Done()
r := store.ensureWrittenRecord(owner, uid1)
assert.Equal(t, uid1, r.ownerUID)
once.Do(func() {
firstRecord = r
})
assert.Same(t, firstRecord, r)
}()
}
wg.Wait()
require.NotNil(t, firstRecord)
assert.Len(t, store.writes, 1)
// Concurrent replacement with new UID
var replacementRecord *ownerRecord
var replaceOnce sync.Once
for range numGoroutines {
wg.Add(1)
go func() {
defer wg.Done()
r := store.ensureWrittenRecord(owner, uid2)
assert.Equal(t, uid2, r.ownerUID)
replaceOnce.Do(func() {
replacementRecord = r
})
assert.Same(t, replacementRecord, r)
}()
}
wg.Wait()
require.NotNil(t, replacementRecord)
assert.Len(t, store.writes, 1)
assert.Same(t, replacementRecord, store.writes[owner])
assert.NotSame(t, firstRecord, replacementRecord)
}
func TestConsistencyStore_WroteAt(t *testing.T) {
store := NewConsistencyStore(nil)
owner := types.NamespacedName{Name: "owner1"}
uid1 := types.UID("uid-1")
grPod := schema.GroupResource{Group: "", Resource: "pods"}
store.WroteAt(owner, uid1, grPod, "10")
record := store.getWrittenRecord(owner)
require.NotNil(t, record)
assert.Equal(t, uid1, record.ownerUID)
assert.Equal(t, "10", record.versions[grPod])
// Write again
store.WroteAt(owner, uid1, grPod, "20")
assert.Equal(t, "20", record.versions[grPod])
}
func TestConsistencyStore_Clear(t *testing.T) {
store := NewConsistencyStore(nil)
owner1 := types.NamespacedName{Name: "owner1"}
owner2 := types.NamespacedName{Name: "owner2"}
uid1 := types.UID("uid-1")
uid2 := types.UID("uid-2")
// Setup
r1 := store.ensureWrittenRecord(owner1, uid1)
r2 := store.ensureWrittenRecord(owner2, uid2)
require.Len(t, store.writes, 2)
// Clear non-existent
store.Clear(types.NamespacedName{Name: "non-existent"}, uid1)
assert.Len(t, store.writes, 2)
// Clear with wrong UID
store.Clear(owner1, uid2)
assert.Len(t, store.writes, 2, "Should not clear with wrong UID")
assert.Same(t, r1, store.writes[owner1])
// Clear with correct UID
store.Clear(owner1, uid1)
assert.Len(t, store.writes, 1, "Should clear with correct UID")
assert.Nil(t, store.writes[owner1])
assert.Same(t, r2, store.writes[owner2], "Other record should remain")
// Re-add r1
store.ensureWrittenRecord(owner1, uid1)
require.Len(t, store.writes, 2)
// Clear with empty UID
store.Clear(owner1, "")
assert.Len(t, store.writes, 1, "Should clear with empty UID")
assert.Nil(t, store.writes[owner1])
assert.Same(t, r2, store.writes[owner2])
}
func TestConsistencyStore_IsReady(t *testing.T) {
owner1 := types.NamespacedName{Name: "owner1"}
uid1 := types.UID("uid-1")
grPod := schema.GroupResource{Group: "", Resource: "pods"}
podStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
resourceStores := map[schema.GroupResource]LastSyncRVGetter{
grPod: podStore,
}
store := NewConsistencyStore(resourceStores)
// Case 1: No record. Ready.
require.NoError(t, store.EnsureReady(owner1), "Ready if no record exists")
// Add a write and initial read rv
podStore.Bookmark("5")
store.WroteAt(owner1, uid1, grPod, "10")
// Case 2: Record exists, read < write. Not ready.
require.Error(t, store.EnsureReady(owner1), "Not ready if read < write")
// Add read, equal
podStore.Bookmark("10")
// Case 3: Record exists, read == write. Ready.
require.NoError(t, store.EnsureReady(owner1), "Ready if read == write")
// Add read, higher
podStore.Bookmark("15")
// Case 4: Record exists, read > write. Ready.
require.NoError(t, store.EnsureReady(owner1), "Ready if read > write")
// Assert that the record no longer exists, we no longer need to track the
// reads as long as the read has been higher than the latest write.
assert.Nil(t, store.getWrittenRecord(owner1), "Written record should no longer exist")
}

View file

@ -935,6 +935,13 @@ const (
// pod's lifecycle and will not block pod termination.
SidecarContainers featuregate.Feature = "SidecarContainers"
// owner: @michaelasp
// kep: http://kep.k8s.io/5647
//
// Introduces the ability for the DaemonSet controller to be able to read its writes
// prior to running a reconcile on the same object.
StaleControllerConsistencyDaemonSet featuregate.Feature = "StaleControllerConsistencyDaemonSet"
// owner: @liggitt
//
// Mitigates spurious statefulset rollouts due to controller revision comparison mismatches
@ -1766,6 +1773,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: true, LockToDefault: true, PreRelease: featuregate.GA}, // GA in 1.33 remove in 1.36
},
StaleControllerConsistencyDaemonSet: {
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
StatefulSetSemanticRevisionComparison: {
// This is a mitigation for a 1.34 regression due to serialization differences that cannot be feature-gated,
// so this mitigation should not auto-disable even if emulating versions prior to 1.34 with --emulation-version.
@ -2374,6 +2385,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
SidecarContainers: {},
StaleControllerConsistencyDaemonSet: {featuregate.Feature(clientfeatures.AtomicFIFO)},
StatefulSetSemanticRevisionComparison: {},
StorageCapacityScoring: {},
@ -2508,10 +2521,6 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
}
func init() {
runtime.Must(utilfeature.DefaultMutableFeatureGate.AddVersioned(defaultVersionedKubernetesFeatureGates))
runtime.Must(utilfeature.DefaultMutableFeatureGate.AddDependencies(defaultKubernetesFeatureGateDependencies))
runtime.Must(zpagesfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
// Register all client-go features with kube's feature gate instance and make all client-go
// feature checks use kube's instance. The effect is that for kube binaries, client-go
// features are wired to the existing --feature-gates flag just as all other features
@ -2520,4 +2529,8 @@ func init() {
ca := &clientAdapter{utilfeature.DefaultMutableFeatureGate}
runtime.Must(clientfeatures.AddVersionedFeaturesToExistingFeatureGates(ca))
clientfeatures.ReplaceFeatureGates(ca)
runtime.Must(utilfeature.DefaultMutableFeatureGate.AddVersioned(defaultVersionedKubernetesFeatureGates))
runtime.Must(utilfeature.DefaultMutableFeatureGate.AddDependencies(defaultKubernetesFeatureGateDependencies))
runtime.Must(zpagesfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
}

View file

@ -178,6 +178,7 @@
| ServiceAccountTokenPodNodeInfo | :ballot_box_with_check:&nbsp;1.30+ | :closed_lock_with_key:&nbsp;1.32+ | 1.29 | 1.301.31 | 1.32 | | | [code](https://cs.k8s.io/?q=%5CbServiceAccountTokenPodNodeInfo%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbServiceAccountTokenPodNodeInfo%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| SidecarContainers | :ballot_box_with_check:&nbsp;1.29+ | :closed_lock_with_key:&nbsp;1.33+ | 1.28 | 1.291.32 | 1.33 | | | [code](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSidecarContainers%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| SizeBasedListCostEstimate | :ballot_box_with_check:&nbsp;1.34+ | | | 1.34 | | | | [code](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StaleControllerConsistencyDaemonSet | :ballot_box_with_check:&nbsp;1.36+ | | | 1.36 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StatefulSetSemanticRevisionComparison | :ballot_box_with_check:&nbsp;1.0+ | | | 1.0 | | | | [code](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StorageCapacityScoring | | | 1.33 | | | | | [code](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StorageNamespaceIndex | :ballot_box_with_check:&nbsp;1.30+ | | | 1.301.32 | | 1.33 | | [code](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |

View file

@ -1709,6 +1709,12 @@
lockToDefault: false
preRelease: Beta
version: "1.34"
- name: StaleControllerConsistencyDaemonSet
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: StatefulSetSemanticRevisionComparison
versionedSpecs:
- default: true