diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index bff1e6401cf..c847ad4f229 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -53,6 +53,7 @@ import ( hashutil "k8s.io/kubernetes/pkg/util/hash" taintutils "k8s.io/kubernetes/pkg/util/taints" "k8s.io/utils/clock" + "k8s.io/utils/ptr" "k8s.io/klog/v2" ) @@ -1030,6 +1031,60 @@ func CountTerminatingPods(pods []*v1.Pod) int32 { return int32(numberOfTerminatingPods) } +// nextPodAvailabilityCheck implements similar logic to podutil.IsPodAvailable +func nextPodAvailabilityCheck(pod *v1.Pod, minReadySeconds int32, now time.Time) *time.Duration { + if !podutil.IsPodReady(pod) || minReadySeconds <= 0 { + return nil + } + + c := podutil.GetPodReadyCondition(pod.Status) + if c.LastTransitionTime.IsZero() { + return nil + } + minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second + nextCheck := c.LastTransitionTime.Add(minReadySecondsDuration).Sub(now) + if nextCheck > 0 { + return ptr.To(nextCheck) + } + return nil +} + +// findMinNextPodAvailabilitySimpleCheck finds a duration when the next availability check should occur. It also returns the +// first pod affected by the future availability recalculation (there might be more pods if they became ready at the same time; +// this helps to implement FindMinNextPodAvailabilityCheck). +func findMinNextPodAvailabilitySimpleCheck(pods []*v1.Pod, minReadySeconds int32, now time.Time) (*time.Duration, *v1.Pod) { + var minAvailabilityCheck *time.Duration + var checkPod *v1.Pod + for _, p := range pods { + nextCheck := nextPodAvailabilityCheck(p, minReadySeconds, now) + if nextCheck != nil && (minAvailabilityCheck == nil || *nextCheck < *minAvailabilityCheck) { + minAvailabilityCheck = nextCheck + checkPod = p + } + } + return minAvailabilityCheck, checkPod +} + +// FindMinNextPodAvailabilityCheck finds a duration when the next availability check should occur. +// We should check for the availability at the same time as the status evaluation/update occurs (e.g. .status.availableReplicas) by +// passing lastOwnerStatusEvaluation. This ensures that we will not skip any pods that might become available +// (findMinNextPodAvailabilitySimpleCheck would return nil in the future time), since the owner status evaluation. +// clock is then used to calculate the precise time for the next availability check. +func FindMinNextPodAvailabilityCheck(pods []*v1.Pod, minReadySeconds int32, lastOwnerStatusEvaluation time.Time, clock clock.PassiveClock) *time.Duration { + nextCheckAccordingToOwnerStatusEvaluation, checkPod := findMinNextPodAvailabilitySimpleCheck(pods, minReadySeconds, lastOwnerStatusEvaluation) + if nextCheckAccordingToOwnerStatusEvaluation == nil || checkPod == nil { + return nil + } + // There must be a nextCheck. We try to calculate a more precise value for the next availability check. + // Check the earliest pod to avoid being preempted by a later pod. + if updatedNextCheck := nextPodAvailabilityCheck(checkPod, minReadySeconds, clock.Now()); updatedNextCheck != nil { + // There is a delay since the last Now() call (lastOwnerStatusEvaluation). Use the updatedNextCheck. + return updatedNextCheck + } + // Fall back to 0 (immediate check) in case the last nextPodAvailabilityCheck call (with a refreshed Now) returns nil, as we might be past the check. + return ptr.To(time.Duration(0)) +} + func IsPodActive(p *v1.Pod) bool { return v1.PodSucceeded != p.Status.Phase && v1.PodFailed != p.Status.Phase && diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index d6bc919a6df..819d44be1e5 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -881,6 +881,244 @@ func TestSortingActivePodsWithRanks(t *testing.T) { } } +func TestNextPodAvailabilityCheck(t *testing.T) { + newPodWithReadyCond := func(now metav1.Time, ready bool, beforeSec int) *v1.Pod { + conditionStatus := v1.ConditionFalse + if ready { + conditionStatus = v1.ConditionTrue + } + return &v1.Pod{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + LastTransitionTime: metav1.NewTime(now.Add(-1 * time.Duration(beforeSec) * time.Second)), + Status: conditionStatus, + }, + }, + }, + } + } + + now := metav1.Now() + tests := []struct { + name string + pod *v1.Pod + minReadySeconds int32 + expected *time.Duration + }{ + { + name: "not ready", + pod: newPodWithReadyCond(now, false, 0), + minReadySeconds: 0, + expected: nil, + }, + { + name: "no minReadySeconds defined", + pod: newPodWithReadyCond(now, true, 0), + minReadySeconds: 0, + expected: nil, + }, + { + name: "lastTransitionTime is zero", + pod: func() *v1.Pod { + pod := newPodWithReadyCond(now, true, 0) + pod.Status.Conditions[0].LastTransitionTime = metav1.Time{} + return pod + }(), + minReadySeconds: 1, + expected: nil, + }, + { + name: "just became ready - available in 1s", + pod: newPodWithReadyCond(now, true, 0), + minReadySeconds: 1, + expected: ptr.To(time.Second), + }, + { + name: "ready for 20s - available in 10s", + pod: newPodWithReadyCond(now, true, 20), + minReadySeconds: 30, + expected: ptr.To(10 * time.Second), + }, + { + name: "available", + pod: newPodWithReadyCond(now, true, 51), + minReadySeconds: 50, + expected: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nextAvailable := nextPodAvailabilityCheck(test.pod, test.minReadySeconds, now.Time) + if !ptr.Equal(nextAvailable, test.expected) { + t.Errorf("expected next pod availability check: %v, got: %v", test.expected, nextAvailable) + } + }) + } +} + +func TestFindMinNextPodAvailabilitySimpleCheck(t *testing.T) { + now := metav1.Now() + + pod := func(name string, ready bool, beforeSec int) *v1.Pod { + p := testutil.NewPod(name, "node0") + if ready { + p.Status.Conditions[0].LastTransitionTime = metav1.NewTime(now.Add(-1 * time.Duration(beforeSec) * time.Second)) + } else { + p.Status.Conditions[0].Status = v1.ConditionFalse + } + return p + } + + tests := []struct { + name string + pods []*v1.Pod + minReadySeconds int32 + expected *time.Duration + expectedPod *string + }{ + { + name: "no pods", + pods: nil, + minReadySeconds: 0, + expected: nil, + expectedPod: nil, + }, + { + name: "unready pods", + pods: []*v1.Pod{ + pod("pod1", false, 0), + pod("pod2", false, 0), + }, + minReadySeconds: 0, + expected: nil, + expectedPod: nil, + }, + { + name: "ready pods with no minReadySeconds", + pods: []*v1.Pod{ + pod("pod1", true, 0), + pod("pod2", true, 0), + }, + minReadySeconds: 0, + expected: nil, + expectedPod: nil, + }, + { + name: "unready and ready pods should find min next availability check", + pods: []*v1.Pod{ + pod("pod1", false, 0), + pod("pod2", true, 2), + pod("pod3", true, 0), + pod("pod4", true, 4), + pod("pod5", false, 0), + }, + minReadySeconds: 10, + expected: ptr.To(6 * time.Second), + expectedPod: ptr.To("pod4"), + }, + { + name: "unready and available pods do not require min next availability check", // only after pods become ready we can schedule one + pods: []*v1.Pod{ + pod("pod1", false, 0), + pod("pod2", true, 15), + pod("pod3", true, 11), + pod("pod4", true, 10), + pod("pod5", false, 0), + }, + minReadySeconds: 10, + expected: nil, + expectedPod: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nextAvailable, checkPod := findMinNextPodAvailabilitySimpleCheck(test.pods, test.minReadySeconds, now.Time) + var checkPodName *string + if checkPod != nil { + checkPodName = ptr.To(checkPod.Name) + } + if !ptr.Equal(nextAvailable, test.expected) { + t.Errorf("expected next min pod availability check: %v, got: %v", test.expected, nextAvailable) + } + if !ptr.Equal(checkPodName, test.expectedPod) { + t.Errorf("expected next min pod availability check for pod: %v, got: %v", test.expectedPod, checkPodName) + } + + // using the same now for status evaluation and the clock should return the same result as findMinNextPodAvailabilitySimpleCheck + nextAvailable = FindMinNextPodAvailabilityCheck(test.pods, test.minReadySeconds, now.Time, testingclock.NewFakeClock(now.Time)) + + if !ptr.Equal(nextAvailable, test.expected) { + t.Errorf("expected next min pod availability check when status evaluation and clock is now: %v, got: %v", test.expected, nextAvailable) + } + }) + } +} + +func TestFindMinNextPodAvailability(t *testing.T) { + now := metav1.Now() + + pod := func(name string, ready bool, beforeSec int) *v1.Pod { + p := testutil.NewPod(name, "node0") + if ready { + p.Status.Conditions[0].LastTransitionTime = metav1.NewTime(now.Add(-1 * time.Duration(beforeSec) * time.Second)) + } else { + p.Status.Conditions[0].Status = v1.ConditionFalse + } + return p + } + + tests := []struct { + name string + pods []*v1.Pod + minReadySeconds int32 + statusEvaluationDelaySeconds int + expected *time.Duration + }{ + { + name: "unready and ready pods should find min next availability check considering status evaluation/update delay", + pods: []*v1.Pod{ + pod("pod1", false, 0), + pod("pod2", true, 2), + pod("pod3", true, 0), + pod("pod4", true, 4), + pod("pod5", false, 0), + }, + minReadySeconds: 10, + statusEvaluationDelaySeconds: 2, // total is 4+2 since the pod4 became ready + expected: ptr.To(4 * time.Second), + }, + { + name: "unready and ready pods should find min next availability check even if the status evaluation delay is longer than minReadySeconds", + pods: []*v1.Pod{ + pod("pod1", false, 0), + pod("pod2", true, 2), + pod("pod3", true, 0), + pod("pod4", true, 4), + pod("pod5", false, 0), + }, + minReadySeconds: 10, + statusEvaluationDelaySeconds: 7, // total is 4+7 since the pod4 became ready + expected: ptr.To(0 * time.Second), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + oldNow := now.Time + newNow := testingclock.NewFakePassiveClock(now.Add(time.Duration(test.statusEvaluationDelaySeconds) * time.Second)) + nextAvailable := FindMinNextPodAvailabilityCheck(test.pods, test.minReadySeconds, oldNow, newNow) + + if !ptr.Equal(nextAvailable, test.expected) { + t.Errorf("expected next min pod availability check: %v, got: %v", test.expected, nextAvailable) + } + }) + } +} + func TestActiveReplicaSetsFiltering(t *testing.T) { rsUuid := uuid.NewUUID() diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index d53d762ff9b..8934bd46055 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -497,13 +497,14 @@ func (rsc *ReplicaSetController) updatePod(logger klog.Logger, old, cur interfac // having its status updated with the newly available replica. For now, we can fake the // update by resyncing the controller MinReadySeconds after the it is requeued because // a Pod transitioned to Ready. + // If there are multiple pods with varying readiness times, we cannot correctly track it + // with the current queue. Further resyncs are attempted at the end of the syncReplicaSet + // function. // Note that this still suffers from #29229, we are just moving the problem one level // "closer" to kubelet (from the deployment to the replica set controller). if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 { logger.V(2).Info("pod will be enqueued after a while for availability check", "duration", rs.Spec.MinReadySeconds, "kind", rsc.Kind, "pod", klog.KObj(oldPod)) - // Add a second to avoid milliseconds skew in AddAfter. - // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. - rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) + rsc.enqueueRSAfter(rs, time.Duration(rs.Spec.MinReadySeconds)*time.Second) } return } @@ -747,12 +748,14 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) } var manageReplicasErr error - var nextSyncInSeconds *int + var nextSyncDuration *time.Duration if rsNeedsSync && rs.DeletionTimestamp == nil { manageReplicasErr = rsc.manageReplicas(ctx, activePods, rs) } rs = rs.DeepCopy() - newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr, rsc.controllerFeatures, rsc.clock) + // Use the same time for calculating status and nextSyncDuration. + now := rsc.clock.Now() + newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr, rsc.controllerFeatures, now) // Always updates status as pods come up or die. updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus, rsc.controllerFeatures) @@ -764,14 +767,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) if manageReplicasErr != nil { return manageReplicasErr } - // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew. + // Plan the next availability check as a last line of defense against queue preemption (we have one queue key for checking availability of all the pods) + // or early sync (see https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info). if updatedRS.Spec.MinReadySeconds > 0 && - updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && - updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { - nextSyncInSeconds = ptr.To(int(updatedRS.Spec.MinReadySeconds)) + updatedRS.Status.ReadyReplicas != updatedRS.Status.AvailableReplicas { + // Safeguard fallback to the .spec.minReadySeconds to ensure that we always end up with .status.availableReplicas updated. + nextSyncDuration = ptr.To(time.Duration(updatedRS.Spec.MinReadySeconds) * time.Second) + // Use the same point in time (now) for calculating status and nextSyncDuration to get matching availability for the pods. + if nextCheck := controller.FindMinNextPodAvailabilityCheck(activePods, updatedRS.Spec.MinReadySeconds, now, rsc.clock); nextCheck != nil { + nextSyncDuration = nextCheck + } } - if nextSyncInSeconds != nil { - rsc.queue.AddAfter(key, time.Duration(*nextSyncInSeconds)*time.Second) + if nextSyncDuration != nil { + rsc.queue.AddAfter(key, *nextSyncDuration) } return nil } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index a54572c6975..9637adce8dc 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/onsi/gomega" "math/rand" "net/http/httptest" "net/url" @@ -1618,6 +1619,87 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { } } +func TestReplicaSetAvailabilityCheck(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(4, labelMap) + rs.Spec.MinReadySeconds = 5 + client := fake.NewClientset(rs) + stopCh := make(chan struct{}) + defer close(stopCh) + + manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas) + if err := informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs); err != nil { + t.Fatal(err) + } + + now := time.Now() + pod1 := newPod("foobar-1", rs, v1.PodPending, nil, true) + pod2 := newPod("foobar-2", rs, v1.PodRunning, &metav1.Time{Time: now}, true) + pod3 := newPod("foobar-3", rs, v1.PodRunning, &metav1.Time{Time: now.Add(-2 * time.Second)}, true) + pod4 := newPod("foobar-4", rs, v1.PodRunning, &metav1.Time{Time: now.Add(-4300 * time.Millisecond)}, true) + if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1); err != nil { + t.Fatal(err) + } + if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod2); err != nil { + t.Fatal(err) + } + if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod3); err != nil { + t.Fatal(err) + } + if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod4); err != nil { + t.Fatal(err) + } + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + err := manager.syncReplicaSet(ctx, GetKey(rs, t)) + if err != nil { + t.Fatal(err) + } + + var updatedRs *apps.ReplicaSet + for _, a := range client.Actions() { + if a.GetResource().Resource != "replicasets" { + t.Errorf("Unexpected action %+v", a) + continue + } + + switch action := a.(type) { + case core.UpdateAction: + var ok bool + if updatedRs, ok = action.GetObject().(*apps.ReplicaSet); !ok { + t.Errorf("Expected a ReplicaSet as the argument to update, got %T", updatedRs) + } + default: + t.Errorf("Unexpected action %+v", a) + } + } + + // one pod is not ready + if updatedRs.Status.ReadyReplicas != 3 { + t.Errorf("Expected updated ReplicaSet to contain ready replicas %v, got %v instead", + 3, updatedRs.Status.ReadyReplicas) + } + if updatedRs.Status.AvailableReplicas != 0 { + t.Errorf("Expected updated ReplicaSet to contain available replicas %v, got %v instead", + 0, updatedRs.Status.AvailableReplicas) + } + + if got, want := manager.queue.Len(), 0; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } + + // RS should be re-queued after 700ms to recompute .status.availableReplicas (200ms extra for the test). + ktesting.Eventually(ctx, func(tCtx ktesting.TContext) int { + return manager.queue.Len() + }).WithTimeout(900*time.Millisecond). + WithPolling(10*time.Millisecond). + Should(gomega.Equal(1), " RS should be re-queued to recompute .status.availableReplicas") + +} + var ( imagePullBackOff apps.ReplicaSetConditionType = "ImagePullBackOff" diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index 738d955c42b..4aa35e08bd4 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "reflect" + "time" "k8s.io/klog/v2" @@ -33,7 +34,6 @@ import ( appsclient "k8s.io/client-go/kubernetes/typed/apps/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" - "k8s.io/utils/clock" "k8s.io/utils/ptr" ) @@ -93,7 +93,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface return nil, updateErr } -func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error, controllerFeatures ReplicaSetControllerFeatures, clock clock.PassiveClock) apps.ReplicaSetStatus { +func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error, controllerFeatures ReplicaSetControllerFeatures, now time.Time) apps.ReplicaSetStatus { newStatus := rs.Status // Count the number of pods that have labels matching the labels of the pod // template of the replica set, the matching pods may have more @@ -104,7 +104,6 @@ func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods readyReplicasCount := 0 availableReplicasCount := 0 templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated() - now := clock.Now() for _, pod := range activePods { if templateLabel.Matches(labels.Set(pod.Labels)) { fullyLabeledReplicasCount++ diff --git a/pkg/controller/replicaset/replica_set_utils_test.go b/pkg/controller/replicaset/replica_set_utils_test.go index 856a7dbcc82..47315b809a8 100644 --- a/pkg/controller/replicaset/replica_set_utils_test.go +++ b/pkg/controller/replicaset/replica_set_utils_test.go @@ -303,7 +303,7 @@ func TestCalculateStatus(t *testing.T) { // test ReplicaSet controller default behavior unless specified otherwise in the test case controllerFeatures := ptr.Deref(test.controllerFeatures, DefaultReplicaSetControllerFeatures()) - replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil, controllerFeatures, clock) + replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil, controllerFeatures, clock.Now()) if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) { t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetStatus, replicaSetStatus) } @@ -400,8 +400,7 @@ func TestCalculateStatusConditions(t *testing.T) { for _, test := range rsStatusConditionTests { t.Run(test.name, func(t *testing.T) { - clock := clocktesting.NewFakePassiveClock(time.Now()) - replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr, DefaultReplicaSetControllerFeatures(), clock) + replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr, DefaultReplicaSetControllerFeatures(), time.Now()) // all test cases have at most 1 status condition if len(replicaSetStatus.Conditions) > 0 { test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime