Merge pull request #138698 from michaelasp/circuitBreaker

Ensure leases are not stale in node controller before marking unhealthy
This commit is contained in:
Kubernetes Prow Robot 2026-05-05 23:34:21 +05:30 committed by GitHub
commit 74f4ad5e38
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 293 additions and 2 deletions

View file

@ -23,6 +23,7 @@ package nodelifecycle
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -35,6 +36,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -56,6 +59,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/tainteviction"
consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/features"
taintutils "k8s.io/kubernetes/pkg/util/taints"
@ -111,6 +115,8 @@ var (
v1.TaintNodeDiskPressure: v1.NodeDiskPressure,
v1.TaintNodePIDPressure: v1.NodePIDPressure,
}
leaseResource = coordv1.SchemeGroupVersion.WithResource("leases").GroupResource()
)
// ZoneState is the state of a given zone.
@ -248,8 +254,10 @@ type Controller struct {
leaseLister coordlisters.LeaseLister
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
consistencyStore consistencyutil.ConsistencyStore
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
@ -423,6 +431,14 @@ func NewNodeLifecycleController(
nc.daemonSetStore = daemonSetInformer.Lister()
nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
if utilfeature.DefaultFeatureGate.Enabled(features.NodeControllerLeaseCircuitBreaker) {
nc.consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{
leaseResource: leaseInformer.Informer().GetStore(),
})
} else {
nc.consistencyStore = consistencyutil.NewNoopConsistencyStore()
}
return nc, nil
}
@ -645,6 +661,15 @@ func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context) {
}
}
// shortCircuitError is a marker type to signal the polling loop in monitorNodeHealth should return early with error
type shortCircuitError struct {
error
}
func (n shortCircuitError) Unwrap() error {
return n.error
}
// monitorNodeHealth verifies node health are constantly updated by kubelet, and if not, post "NodeReady==ConditionUnknown".
// This function will
// - add nodes which are not ready or not reachable for a long period of time to a rate-limited
@ -699,6 +724,12 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
if err == nil {
return true, nil
}
// If the error is due to a short circuit, don't retry.
if utilfeature.DefaultFeatureGate.Enabled(features.NodeControllerLeaseCircuitBreaker) {
if err, ok := errors.AsType[shortCircuitError](err); ok {
return false, err.error
}
}
name := node.Name
node, err = nc.kubeClient.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
@ -816,6 +847,16 @@ func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (t
nc.nodeHealthMap.set(node.Name, nodeHealth)
}()
if utilfeature.DefaultFeatureGate.Enabled(features.NodeControllerLeaseCircuitBreaker) {
// This is a controller-wide consistency check, if the lease cache is stale, then we cannot
// trust any node leases until the cache has caught back up.
err := nc.consistencyStore.EnsureReady(types.NamespacedName{})
// Skip processing this node in this cycle if the node controller cache is not ready yet.
if err != nil {
return 0, v1.NodeCondition{}, nil, shortCircuitError{err}
}
}
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
_, currentReadyCondition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
@ -919,6 +960,35 @@ func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (t
}
if nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {
if utilfeature.DefaultFeatureGate.Enabled(features.NodeControllerLeaseCircuitBreaker) {
var nodeHealthLeaseRV string
if nodeHealth.lease != nil {
nodeHealthLeaseRV = nodeHealth.lease.ResourceVersion
}
// The lease instance in the informer cache indicates it is expired.
// Double-check the live lease is actually expired in case our informer cache is stale.
liveLease, err := nc.kubeClient.CoordinationV1().Leases(v1.NamespaceNodeLease).Get(ctx, node.Name, metav1.GetOptions{})
if err == nil {
if liveLease.ResourceVersion != nodeHealthLeaseRV {
nc.consistencyStore.WroteAt(
// This is a controller wide consistency check, if the lease cache is stale
// we need to wait for the cache to catch up before processing any nodes.
types.NamespacedName{},
"", // No specific UID for generic name
leaseResource,
liveLease.ResourceVersion,
)
return 0, v1.NodeCondition{}, nil, shortCircuitError{&consistencyutil.ConsistencyError{
ReadRV: nodeHealthLeaseRV,
WroteRV: liveLease.ResourceVersion,
GroupResource: leaseResource,
}}
}
} else if !apierrors.IsNotFound(err) {
return 0, v1.NodeCondition{}, nil, shortCircuitError{fmt.Errorf("error looking up lease to verify node %s: %w", node.Name, err)}
}
}
// NodeReady condition or lease was last set longer ago than gracePeriod, so
// update it to Unknown (regardless of its current value) in the master.

View file

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1"
coordinformers "k8s.io/client-go/informers/coordination/v1"
@ -40,11 +41,13 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeletapis "k8s.io/kubelet/pkg/apis"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/testutil"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/node"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/test/utils/ktesting"
@ -1283,6 +1286,9 @@ func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
nodeCreationTime := metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeControllerLeaseCircuitBreaker, true)
testcases := []struct {
description string
fakeNodeHandler *testutil.FakeNodeHandler
@ -1290,6 +1296,8 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
timeToPass time.Duration
newNodeStatus v1.NodeStatus
newLease *coordv1.Lease
apiServerLease *coordv1.Lease
newAPIServerLease *coordv1.Lease
expectedRequestCount int
expectedNodes []*v1.Node
expectedPodStatusUpdate bool
@ -1767,6 +1775,180 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
},
expectedPodStatusUpdate: true,
},
// Node created long time ago, with status updated by kubelet exceeds grace period.
// Node lease is expired in cache, but fresh in API server.
// Expect no action from node controller (within monitor grace period) because of double check.
{
description: "Node created long time ago, with status updated by kubelet exceeds grace period. Node lease is expired in cache, but fresh in API server.",
fakeNodeHandler: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: nodeCreationTime,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
},
lease: createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time)),
timeToPass: time.Hour,
newNodeStatus: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
newLease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time))
l.ResourceVersion = "50"
return l
}(),
apiServerLease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time))
l.ResourceVersion = "50"
return l
}(),
newAPIServerLease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time.Add(time.Hour)))
l.ResourceVersion = "100"
return l
}(),
expectedRequestCount: 2, // List nodes + List nodes + GET lease (GET not counted)
expectedNodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: nodeCreationTime,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
},
},
expectedPodStatusUpdate: false,
},
// Node lease expired in cache but fresh in API server. EnsureReady should skip on second run.
{
description: "Node lease expired in cache but fresh in API server. EnsureReady should skip on second run.",
fakeNodeHandler: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: nodeCreationTime,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
},
lease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time.Add(-60*time.Second)))
l.ResourceVersion = "50"
return l
}(),
timeToPass: 1 * time.Second,
newNodeStatus: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
newLease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time.Add(-60*time.Second)))
l.ResourceVersion = "50"
return l
}(),
apiServerLease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time))
l.ResourceVersion = "100"
return l
}(),
newAPIServerLease: func() *coordv1.Lease {
l := createNodeLease("node0", metav1.NewMicroTime(fakeNow.Time))
l.ResourceVersion = "100"
return l
}(),
expectedRequestCount: 2, // List nodes + List nodes (GET lease not counted)
expectedNodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: nodeCreationTime,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
},
},
},
},
expectedPodStatusUpdate: false,
},
}
for _, item := range testcases {
@ -1792,6 +1974,12 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
if err := nodeController.syncLeaseStore(item.lease); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if item.apiServerLease != nil {
_, err := item.fakeNodeHandler.CoordinationV1().Leases(v1.NamespaceNodeLease).Create(tCtx, item.apiServerLease, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error creating lease in fake client: %v", err)
}
}
if err := nodeController.monitorNodeHealth(tCtx); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -1804,6 +1992,12 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
if err := nodeController.syncLeaseStore(item.newLease); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if item.newAPIServerLease != nil {
_, err := item.fakeNodeHandler.CoordinationV1().Leases(v1.NamespaceNodeLease).Update(tCtx, item.newAPIServerLease, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error updating lease in fake client: %v", err)
}
}
if err := nodeController.monitorNodeHealth(tCtx); err != nil {
t.Fatalf("unexpected error: %v", err)
}

View file

@ -670,6 +670,11 @@ const (
// Allows running kube-proxy with `--mode nftables`.
NFTablesProxyMode featuregate.Feature = "NFTablesProxyMode"
// owner: @michaelasp
//
// Gate for Node Lifecycle Controller to ensure that the Lease object actually is stale before marking a node unhealthy.
NodeControllerLeaseCircuitBreaker featuregate.Feature = "NodeControllerLeaseCircuitBreaker"
// owner: @pravk03, @tallclair
// kep: https://kep.k8s.io/5328
//
@ -1682,6 +1687,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},
},
NodeControllerLeaseCircuitBreaker: {
{Version: version.MustParse("1.37"), Default: true, PreRelease: featuregate.Beta},
},
NodeDeclaredFeatures: {
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
@ -2514,6 +2523,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
NFTablesProxyMode: {},
NodeControllerLeaseCircuitBreaker: {featuregate.Feature(clientfeatures.AtomicFIFO)},
NodeDeclaredFeatures: {},
NodeInclusionPolicyInPodTopologySpread: {},

View file

@ -326,6 +326,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
// used for pod deletion
rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
rbacv1helpers.NewRule("list", "watch", "get", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(coordinationGroup).Resources("leases").RuleOrDie(),
eventsRule(),
},
}

View file

@ -962,6 +962,14 @@ items:
- get
- list
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- apiGroups:
- ""
- events.k8s.io

View file

@ -134,6 +134,7 @@
| MutableSchedulingDirectivesForSuspendedJobs | :ballot_box_with_check: 1.36+ | | 1.35 | 1.36 | | | | [code](https://cs.k8s.io/?q=%5CbMutableSchedulingDirectivesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutableSchedulingDirectivesForSuspendedJobs%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| MutatingAdmissionPolicy | :ballot_box_with_check: 1.36+ | | 1.321.33 | 1.341.35 | 1.36 | | | [code](https://cs.k8s.io/?q=%5CbMutatingAdmissionPolicy%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbMutatingAdmissionPolicy%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| NFTablesProxyMode | :ballot_box_with_check: 1.31+ | :closed_lock_with_key: 1.33+ | 1.291.30 | 1.311.32 | 1.33 | | | [code](https://cs.k8s.io/?q=%5CbNFTablesProxyMode%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNFTablesProxyMode%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| NodeControllerLeaseCircuitBreaker | :ballot_box_with_check: 1.37+ | | | 1.37 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbNodeControllerLeaseCircuitBreaker%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNodeControllerLeaseCircuitBreaker%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| NodeDeclaredFeatures | :ballot_box_with_check: 1.36+ | | 1.35 | 1.36 | | | | [code](https://cs.k8s.io/?q=%5CbNodeDeclaredFeatures%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNodeDeclaredFeatures%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| NodeInclusionPolicyInPodTopologySpread | :ballot_box_with_check: 1.26+ | :closed_lock_with_key: 1.33+ | 1.25 | 1.261.32 | 1.33 | | | [code](https://cs.k8s.io/?q=%5CbNodeInclusionPolicyInPodTopologySpread%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNodeInclusionPolicyInPodTopologySpread%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| NodeLogQuery | :ballot_box_with_check: 1.36+ | :closed_lock_with_key: 1.36+ | 1.271.29 | 1.301.35 | 1.36 | | | [code](https://cs.k8s.io/?q=%5CbNodeLogQuery%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbNodeLogQuery%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |

View file

@ -1249,6 +1249,12 @@
lockToDefault: true
preRelease: GA
version: "1.33"
- name: NodeControllerLeaseCircuitBreaker
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.37"
- name: NodeDeclaredFeatures
versionedSpecs:
- default: false