diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 531e1e14284..11aa7bb5237 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -128,6 +128,11 @@ const ( // Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it. CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth" + // owner: @HirazawaUi + // + // Enabling this feature gate will cause the pod's status to change due to a kubelet restart. + ChangeContainerStatusOnKubeletRestart = "ChangeContainerStatusOnKubeletRestart" + // owner: @sanposhiho @wojtek-t // kep: https://kep.k8s.io/5278 // @@ -1107,6 +1112,11 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.21"), Default: false, PreRelease: featuregate.Alpha}, }, + ChangeContainerStatusOnKubeletRestart: { + {Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA}, + {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Deprecated}, + }, + ClearingNominatedNodeNameAfterBinding: { {Version: version.MustParse("1.34"), Default: false, PreRelease: featuregate.Alpha}, }, @@ -2042,6 +2052,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature CSIVolumeHealth: {}, + ChangeContainerStatusOnKubeletRestart: {}, + ClearingNominatedNodeNameAfterBinding: {}, ClusterTrustBundle: {}, diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 3e8928b5095..67791e17109 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -2409,6 +2409,14 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon oldStatusPtr = &oldStatus } status := convertContainerStatus(cStatus, oldStatusPtr) + if !utilfeature.DefaultFeatureGate.Enabled(features.ChangeContainerStatusOnKubeletRestart) { + if cStatus.State == kubecontainer.ContainerStateRunning { + if oldStatus, ok := oldStatuses[status.Name]; ok && oldStatus.Started != nil { + status.Started = oldStatus.Started + } + } + } + if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { allocatedContainer := kubecontainer.GetContainerSpec(pod, cName) if allocatedContainer != nil { diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index b390d5f482e..85737b6ad43 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -18,16 +18,19 @@ package prober import ( "context" + "sync" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" @@ -278,6 +281,10 @@ func (m *manager) isContainerStarted(pod *v1.Pod, containerStatus *v1.ContainerS return result == results.Success } + if !utilfeature.DefaultFeatureGate.Enabled(features.ChangeContainerStatusOnKubeletRestart) && containerStatus.Started != nil && *containerStatus.Started { + return true + } + // if there is a startup probe which hasn't run yet, the container is not // started. if _, exists := m.getWorker(pod.UID, containerStatus.Name, startup); exists { @@ -288,6 +295,40 @@ func (m *manager) isContainerStarted(pod *v1.Pod, containerStatus *v1.ContainerS return true } +// setReadyStateOnKubeletRestart sets the ready state of a container to false if it was started +// before kubelet restarted and has a readiness probe, but the pod is not ready yet. +// This is to avoid flapping ready status of containers that were ready before kubelet restarted. +func (m *manager) setReadyStateOnKubeletRestart(ready *bool, pod *v1.Pod, containerStatus *v1.ContainerStatus, containerSpec *v1.Container) { + var containerStartTime time.Time + if containerStatus.State.Running != nil { + containerStartTime = containerStatus.State.Running.StartedAt.Time + } + + if !containerStartTime.IsZero() && containerStartTime.Before(kubeletRestartGracePeriod(m.start)) { + // At this point, the Pod may be in one of the following two states: + // - It has not yet been added to the readinessManager. In this case, we directly set the container status to Ready. + // - It has been added to the readinessManager, but the probe has not yet started execution. + // Therefore, in this case, we also need to set the container status to Ready. + if !*ready { + if _, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(containerStatus.ContainerID)); !ok { + *ready = true + } + } + if containerSpec.ReadinessProbe != nil { + podIsReady := false + for _, c := range pod.Status.Conditions { + if c.Type == v1.PodReady && c.Status == v1.ConditionTrue { + podIsReady = true + break + } + } + if !podIsReady { + *ready = false + } + } + } +} + func (m *manager) UpdatePodStatus(ctx context.Context, pod *v1.Pod, podStatus *v1.PodStatus) { logger := klog.FromContext(ctx) for i, c := range podStatus.ContainerStatuses { @@ -315,6 +356,20 @@ func (m *manager) UpdatePodStatus(ctx context.Context, pod *v1.Pod, podStatus *v logger.Info("Failed to trigger a manual run", "probe", w.probeType.String()) } } + + if !utilfeature.DefaultFeatureGate.Enabled(features.ChangeContainerStatusOnKubeletRestart) { + // Find the container spec for the container status. + var containerSpec *v1.Container + for j := range pod.Spec.Containers { + if pod.Spec.Containers[j].Name == c.Name { + containerSpec = &pod.Spec.Containers[j] + break + } + } + if containerSpec != nil { + m.setReadyStateOnKubeletRestart(&ready, pod, &podStatus.ContainerStatuses[i], containerSpec) + } + } } podStatus.ContainerStatuses[i].Ready = ready } @@ -356,6 +411,9 @@ func (m *manager) UpdatePodStatus(ctx context.Context, pod *v1.Pod, podStatus *v logger.Info("Failed to trigger a manual run", "probe", w.probeType.String()) } } + if !utilfeature.DefaultFeatureGate.Enabled(features.ChangeContainerStatusOnKubeletRestart) { + m.setReadyStateOnKubeletRestart(&ready, pod, &podStatus.InitContainerStatuses[i], &initContainer) + } } podStatus.InitContainerStatuses[i].Ready = ready } @@ -381,3 +439,12 @@ func (m *manager) workerCount() int { defer m.workerLock.RUnlock() return len(m.workers) } + +// kubeletRestartGracePeriod returns a time point that is 10 seconds before the kubelet start time. +// This grace period is used to determine if a container was already running before kubelet restarted. +// If a container's start time is before this grace period, it indicates the container was running +// prior to kubelet restart and should not be immediately marked as failed to avoid unnecessary +// status changes for containers that were previously ready. +func kubeletRestartGracePeriod(start time.Time) time.Time { + return start.Add(-time.Second * 10) +} diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index f33340a9af9..cd125d763fb 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -247,8 +247,25 @@ func (w *worker) doProbe(ctx context.Context) (keepGoing bool) { if !w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } + w.containerID = kubecontainer.ParseContainerID(c.ContainerID) - w.resultsManager.Set(w.containerID, w.initialValue, w.pod) + if !utilfeature.DefaultFeatureGate.Enabled(features.ChangeContainerStatusOnKubeletRestart) { + // On kubelet restart, we don't want to immediately set the probe result to Failure, + // as this could cause a container that was Ready to become NotReady. + isRestart := false + if c.State.Running != nil { + containerStartTime := c.State.Running.StartedAt.Time + if !containerStartTime.IsZero() && containerStartTime.Before(kubeletRestartGracePeriod(w.probeManager.start)) { + isRestart = true + } + } + if !isRestart { + w.resultsManager.Set(w.containerID, w.initialValue, w.pod) + } + } else { + w.resultsManager.Set(w.containerID, w.initialValue, w.pod) + } + // We've got a new container; resume probing. w.onHold = false } diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index fb728d47086..11bcb76f70c 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/fake" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" @@ -779,3 +780,140 @@ func TestStartupProbeDisabledByStarted(t *testing.T) { expectContinue(t, w, w.doProbe(ctx), msg) expectResult(t, w, results.Success, msg) } + +func TestChangeContainerStatusOnKubeletRestart(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + + tests := []struct { + name string + featureEnabled bool + isRestart bool + probeType probeType + initialValue results.Result + expectSet bool + }{ + { + name: "feature enabled, is restart, readiness", + featureEnabled: true, + isRestart: true, + probeType: readiness, + initialValue: results.Failure, + expectSet: true, + }, + { + name: "feature enabled, is restart, liveness", + featureEnabled: true, + isRestart: true, + probeType: liveness, + initialValue: results.Success, + expectSet: true, + }, + { + name: "feature enabled, is restart, startup", + featureEnabled: true, + isRestart: true, + probeType: startup, + initialValue: results.Unknown, + expectSet: true, + }, + { + name: "feature enabled, not restart, readiness", + featureEnabled: true, + isRestart: false, + probeType: readiness, + initialValue: results.Failure, + expectSet: true, + }, + { + name: "feature enabled, not restart, liveness", + featureEnabled: true, + isRestart: false, + probeType: liveness, + initialValue: results.Success, + expectSet: true, + }, + { + name: "feature enabled, not restart, startup", + featureEnabled: true, + isRestart: false, + probeType: startup, + initialValue: results.Unknown, + expectSet: true, + }, + { + name: "feature disabled, is restart, readiness", + featureEnabled: false, + isRestart: true, + probeType: readiness, + expectSet: false, + }, + { + name: "feature disabled, is restart, liveness", + featureEnabled: false, + isRestart: true, + probeType: liveness, + expectSet: false, + }, + { + name: "feature disabled, is restart, startup", + featureEnabled: false, + isRestart: true, + probeType: startup, + expectSet: false, + }, + { + name: "feature disabled, not restart, readiness", + featureEnabled: false, + isRestart: false, + probeType: readiness, + initialValue: results.Failure, + expectSet: true, + }, + { + name: "feature disabled, not restart, liveness", + featureEnabled: false, + isRestart: false, + probeType: liveness, + initialValue: results.Success, + expectSet: true, + }, + { + name: "feature disabled, not restart, startup", + featureEnabled: false, + isRestart: false, + probeType: startup, + initialValue: results.Unknown, + expectSet: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ChangeContainerStatusOnKubeletRestart, tc.featureEnabled) + + m := newTestManager() + podStatus := getTestRunningStatus() + podStatus.ContainerStatuses[0].ContainerID = "test://container-id" + if tc.isRestart { + podStatus.ContainerStatuses[0].State.Running.StartedAt = metav1.Time{Time: m.start.Add(-5 * time.Minute)} + } else { + podStatus.ContainerStatuses[0].State.Running.StartedAt = metav1.Time{Time: m.start.Add(5 * time.Minute)} + } + + w := newTestWorker(m, tc.probeType, v1.Probe{InitialDelaySeconds: 1000}) + m.statusManager.SetPodStatus(logger, w.pod, podStatus) + + w.doProbe(ctx) + + containerID := kubecontainer.ParseContainerID(podStatus.ContainerStatuses[0].ContainerID) + result, ok := resultsManager(m, tc.probeType).Get(containerID) + + if ok != tc.expectSet { + t.Errorf("Expected result to be set: %v, but got: %v", tc.expectSet, ok) + } + if tc.expectSet && result != tc.initialValue { + t.Errorf("Expected result %v, but got: %v", tc.initialValue, result) + } + }) + } +} diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index 1434d17e57c..b24a483a872 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -175,6 +175,16 @@ lockToDefault: false preRelease: Alpha version: "1.32" +- name: ChangeContainerStatusOnKubeletRestart + versionedSpecs: + - default: true + lockToDefault: false + preRelease: GA + version: "1.0" + - default: false + lockToDefault: false + preRelease: Deprecated + version: "1.35" - name: ClearingNominatedNodeNameAfterBinding versionedSpecs: - default: false diff --git a/test/e2e_node/container_lifecycle_test.go b/test/e2e_node/container_lifecycle_test.go index 1767e200808..71d6d027fec 100644 --- a/test/e2e_node/container_lifecycle_test.go +++ b/test/e2e_node/container_lifecycle_test.go @@ -26,12 +26,13 @@ import ( "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - admissionapi "k8s.io/pod-security-admission/api" - + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" imageutils "k8s.io/kubernetes/test/utils/image" + admissionapi "k8s.io/pod-security-admission/api" "k8s.io/utils/ptr" ) @@ -6304,3 +6305,630 @@ var _ = SIGDescribe(framework.WithNodeConformance(), framework.WithSerial(), "Co }) }) }) + +var _ = SIGDescribe(framework.WithSerial(), "Not Change Container Status", framework.WithFeatureGate(features.ChangeContainerStatusOnKubeletRestart), func() { + f := framework.NewDefaultFramework("not-change-container-status-test-serial") + addAfterEachForCleaningUpPods(f) + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + ginkgo.When("a Pod is running", func() { + testKubeletRestart := func(ctx context.Context, pod *v1.Pod) { + client := e2epod.NewPodClient(f) + pod = client.Create(ctx, pod) + + ginkgo.By("Waiting for the pod to be running and ready") + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "PodReady", f.Timeouts.PodStart, + func(p *v1.Pod) (bool, error) { + if p.Status.Phase != v1.PodRunning { + return false, nil + } + for _, cond := range p.Status.Conditions { + if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + return true, nil + } + } + return false, nil + }) + framework.ExpectNoError(err) + + // Double check the initial state before starting the concurrent check + p, err := client.Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(p.Status.ContainerStatuses).ToNot(gomega.BeEmpty()) + for _, status := range p.Status.ContainerStatuses { + gomega.Expect(status.RestartCount).To(gomega.BeZero()) + gomega.Expect(status.Started).ToNot(gomega.BeNil()) + gomega.Expect(*status.Started).To(gomega.BeTrueBecause("The Started field should be set to true when a pod enters the Ready condition.")) + gomega.Expect(status.Ready).To(gomega.BeTrueBecause("The Ready field should be set to true when a pod enters the Ready condition.")) + } + + // The grace period for kubelet startup is 10 seconds, so we wait here for 11 seconds. + time.Sleep(time.Second * 11) + + stopCh := make(chan struct{}) + errCh := make(chan error, 1) + go func() { + watcher, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + pod.Name, + }) + if err != nil { + errCh <- fmt.Errorf("failed to watch pod: %w", err) + return + } + defer watcher.Stop() + + for { + select { + case event, ok := <-watcher.ResultChan(): + if !ok { + return + } + if event.Type != watch.Modified { + continue + } + p, ok := event.Object.(*v1.Pod) + if !ok { + continue + } + + if p.Status.Phase != v1.PodRunning { + errCh <- fmt.Errorf("pod phase is %v, expected %v", p.Status.Phase, v1.PodRunning) + return + } + if len(p.Status.ContainerStatuses) < len(pod.Spec.Containers) { + continue + } + for _, containerStatus := range p.Status.ContainerStatuses { + if containerStatus.RestartCount > 0 { + errCh <- fmt.Errorf("container %q restarted %d times", containerStatus.Name, containerStatus.RestartCount) + return + } + if containerStatus.Started == nil || !*containerStatus.Started { + errCh <- fmt.Errorf("container %q started status is not true", containerStatus.Name) + return + } + if !containerStatus.Ready { + errCh <- fmt.Errorf("container %q ready status is not true", containerStatus.Name) + return + } + } + case <-stopCh: + close(errCh) + return + } + } + }() + + ginkgo.By("restarting the kubelet") + restartKubelet := mustStopKubelet(ctx, f) + restartKubelet(ctx) + + ginkgo.By("ensuring kubelet is healthy") + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrueBecause("kubelet should be started")) + + // Let the goroutine run for a few more seconds to catch any delayed changes + time.Sleep(5 * time.Second) + close(stopCh) + + // Check for errors from the goroutine + for err := range errCh { + framework.ExpectNoError(err, "pod status check failed during kubelet restart") + } + } + + ginkgo.It("should not affect pod status when pod has no probe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-no-probe", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has startupProbe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-with-startup-probe", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has readinessProbe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-with-readiness-probe", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has startupProbe and readinessProbe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-with-startup-and-readiness-probe", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has multiple containers", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-with-multiple-containers", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + }, + { + Name: "container2", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has multiple containers with startupProbes", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-mc-with-startup-probes", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + { + Name: "container2", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has multiple containers with readinessProbes", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-mc-with-readiness-probes", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + { + Name: "container2", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + + ginkgo.It("should not affect pod status when pod has multiple containers with startup and readiness probes", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-mc-with-startup-and-readiness-probes", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + { + Name: "container2", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + }, + } + testKubeletRestart(ctx, pod) + }) + }) + + ginkgo.When("a Pod is running with a restartable init container", func() { + testKubeletRestartForRestartableInit := func(ctx context.Context, pod *v1.Pod) { + client := e2epod.NewPodClient(f) + pod = client.Create(ctx, pod) + + ginkgo.By("Waiting for the pod to be running and ready") + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "PodReady", f.Timeouts.PodStart, + func(p *v1.Pod) (bool, error) { + if p.Status.Phase != v1.PodRunning { + return false, nil + } + for _, cond := range p.Status.Conditions { + if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + return true, nil + } + } + return false, nil + }) + framework.ExpectNoError(err) + + // Double check the initial state before starting the concurrent check + p, err := client.Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(p.Status.InitContainerStatuses).ToNot(gomega.BeEmpty()) + gomega.Expect(p.Status.InitContainerStatuses[0].RestartCount).To(gomega.BeZero()) + gomega.Expect(p.Status.InitContainerStatuses[0].Started).ToNot(gomega.BeNil()) + gomega.Expect(*p.Status.InitContainerStatuses[0].Started).To(gomega.BeTrueBecause("The Started field should be set to true when a pod enters the Ready condition.")) + gomega.Expect(p.Status.InitContainerStatuses[0].Ready).To(gomega.BeTrueBecause("The Ready field should be set to true when a pod enters the Ready condition.")) + + // The grace period for kubelet startup is 10 seconds, so we wait here for 11 seconds. + time.Sleep(time.Second * 11) + + stopCh := make(chan struct{}) + errCh := make(chan error, 1) + go func() { + watcher, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + pod.Name, + }) + if err != nil { + errCh <- fmt.Errorf("failed to watch pod: %w", err) + return + } + defer watcher.Stop() + + for { + select { + case event, ok := <-watcher.ResultChan(): + if !ok { + return + } + if event.Type != watch.Modified { + continue + } + p, ok := event.Object.(*v1.Pod) + if !ok { + continue + } + + if p.Status.Phase != v1.PodRunning { + errCh <- fmt.Errorf("pod phase is %v, expected %v", p.Status.Phase, v1.PodRunning) + return + } + if len(p.Status.InitContainerStatuses) == 0 { + errCh <- fmt.Errorf("pod has no init container statuses") + return + } + containerStatus := p.Status.InitContainerStatuses[0] + if containerStatus.RestartCount > 0 { + errCh <- fmt.Errorf("container restarted %d times", containerStatus.RestartCount) + return + } + if containerStatus.Started == nil || !*containerStatus.Started { + errCh <- fmt.Errorf("container started status is not true") + return + } + if !containerStatus.Ready { + errCh <- fmt.Errorf("container ready status is not true") + return + } + case <-stopCh: + close(errCh) + return + } + } + }() + + ginkgo.By("restarting the kubelet") + restartKubelet := mustStopKubelet(ctx, f) + restartKubelet(ctx) + + ginkgo.By("ensuring kubelet is healthy") + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrueBecause("kubelet should be started")) + + // Let the goroutine run for a few more seconds to catch any delayed changes + time.Sleep(5 * time.Second) + close(stopCh) + + // Check for errors from the goroutine + for err := range errCh { + framework.ExpectNoError(err, "pod status check failed during kubelet restart") + } + } + + ginkgo.It("should not affect pod status when restartable init container has no probe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-restartable-init-no-probe", + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: "restartable-init", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + RestartPolicy: &containerRestartPolicyAlways, + }, + }, + Containers: []v1.Container{ + { + Name: "container", + Image: imageutils.GetPauseImageName(), + }, + }, + }, + } + testKubeletRestartForRestartableInit(ctx, pod) + }) + + ginkgo.It("should not affect pod status when restartable init container has startupProbe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-restartable-init-with-startup-probe", + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: "restartable-init", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + RestartPolicy: &containerRestartPolicyAlways, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "container", + Image: imageutils.GetPauseImageName(), + }, + }, + }, + } + testKubeletRestartForRestartableInit(ctx, pod) + }) + + ginkgo.It("should not affect pod status when restartable init container has readinessProbe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-restartable-init-with-readiness-probe", + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: "restartable-init", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + RestartPolicy: &containerRestartPolicyAlways, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "container", + Image: imageutils.GetPauseImageName(), + }, + }, + }, + } + testKubeletRestartForRestartableInit(ctx, pod) + }) + + ginkgo.It("should not affect pod status when restartable init container has startupProbe and readinessProbe", func(ctx context.Context) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-restartable-init-with-startup-and-readiness-probe", + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: "restartable-init", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + RestartPolicy: &containerRestartPolicyAlways, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "container", + Image: imageutils.GetPauseImageName(), + }, + }, + }, + } + testKubeletRestartForRestartableInit(ctx, pod) + }) + }) +}) diff --git a/test/e2e_node/mirror_pod_test.go b/test/e2e_node/mirror_pod_test.go index 60f8141abc5..b5cec71d47b 100644 --- a/test/e2e_node/mirror_pod_test.go +++ b/test/e2e_node/mirror_pod_test.go @@ -41,7 +41,9 @@ import ( "github.com/google/go-cmp/cmp" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/printers" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2evolume "k8s.io/kubernetes/test/e2e/framework/volume" ) @@ -671,3 +673,169 @@ func checkMirrorPodRecreated(ctx context.Context, cl clientset.Interface, name, } return nil } + +var _ = SIGDescribe("MirrorPod", framework.WithSerial(), func() { + f := framework.NewDefaultFramework("mirror-pod-serial") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + ginkgo.Context("when kubelet restarts", func() { + var ns, podPath, staticPodName, mirrorPodName string + ginkgo.BeforeEach(func(ctx context.Context) { + ns = f.Namespace.Name + staticPodName = "static-pod-" + string(uuid.NewUUID()) + mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName + podPath = kubeletCfg.StaticPodPath + + ginkgo.By("create the static pod") + podSpec := v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: defaultImage, + Command: []string{"sleep", "3600"}, + StartupProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/true"}, + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 1, + }, + }, + }, + } + + err := createStaticPodWithSpec(podPath, staticPodName, ns, podSpec) + framework.ExpectNoError(err) + + ginkgo.By("wait for the mirror pod to be running") + gomega.Eventually(ctx, func(ctx context.Context) error { + return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns) + }, 2*time.Minute, time.Second*4).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("delete the static pod") + err := deleteStaticPod(podPath, staticPodName, ns) + framework.ExpectNoError(err) + + ginkgo.By("wait for the mirror pod to disappear") + gomega.Eventually(ctx, func(ctx context.Context) error { + return checkMirrorPodDisappear(ctx, f.ClientSet, mirrorPodName, ns) + }, 2*time.Minute, time.Second*4).Should(gomega.Succeed()) + }) + + f.It("should not change container status", f.WithNodeConformance(), func(ctx context.Context) { + ginkgo.By("Waiting for the pod to be running and ready") + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, ns, mirrorPodName, "PodReady", f.Timeouts.PodStart, + func(p *v1.Pod) (bool, error) { + if p.Status.Phase != v1.PodRunning { + return false, nil + } + for _, cond := range p.Status.Conditions { + if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + return true, nil + } + } + return false, nil + }) + framework.ExpectNoError(err) + + pod, err := f.ClientSet.CoreV1().Pods(ns).Get(ctx, mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Double check the initial state before starting the concurrent check") + gomega.Expect(pod.Status.ContainerStatuses).ToNot(gomega.BeEmpty()) + for _, status := range pod.Status.ContainerStatuses { + gomega.Expect(status.RestartCount).To(gomega.BeZero()) + gomega.Expect(status.Started).ToNot(gomega.BeNil()) + gomega.Expect(*status.Started).To(gomega.BeTrueBecause("The Started field should be set to true when a pod enters the Ready condition.")) + gomega.Expect(status.Ready).To(gomega.BeTrueBecause("The Ready field should be set to true when a pod enters the Ready condition.")) + } + + // The grace period for kubelet startup is 10 seconds, so we wait here for 11 seconds. + time.Sleep(time.Second * 11) + + stopCh := make(chan struct{}) + errCh := make(chan error, 1) + go func() { + defer ginkgo.GinkgoRecover() + watcher, err := f.ClientSet.CoreV1().Pods(ns).Watch(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + mirrorPodName, + }) + if err != nil { + errCh <- fmt.Errorf("failed to watch pod: %w", err) + return + } + defer watcher.Stop() + + for { + select { + case event, ok := <-watcher.ResultChan(): + if !ok { + return + } + if event.Type != watch.Modified { + continue + } + p, ok := event.Object.(*v1.Pod) + if !ok { + continue + } + + if p.Status.Phase != v1.PodRunning { + errCh <- fmt.Errorf("pod phase is %v, expected %v", p.Status.Phase, v1.PodRunning) + return + } + if len(p.Status.ContainerStatuses) < len(pod.Spec.Containers) { + continue + } + for _, containerStatus := range p.Status.ContainerStatuses { + if containerStatus.RestartCount > 0 { + errCh <- fmt.Errorf("container %q restarted %d times", containerStatus.Name, containerStatus.RestartCount) + return + } + if containerStatus.Started == nil || !*containerStatus.Started { + errCh <- fmt.Errorf("container %q started status is not true", containerStatus.Name) + return + } + if !containerStatus.Ready { + errCh <- fmt.Errorf("container %q ready status is not true", containerStatus.Name) + return + } + } + case <-stopCh: + close(errCh) + return + } + } + }() + + ginkgo.By("restarting the kubelet") + restartKubelet := mustStopKubelet(ctx, f) + restartKubelet(ctx) + + ginkgo.By("ensuring kubelet is healthy") + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrueBecause("kubelet should be started")) + + // Let the goroutine run for a few more seconds to catch any delayed changes + time.Sleep(5 * time.Second) + close(stopCh) + + for err := range errCh { + framework.ExpectNoError(err, "pod status check failed during kubelet restart") + } + }) + }) +}) diff --git a/test/e2e_node/standalone_test.go b/test/e2e_node/standalone_test.go index 51cf0b8f3c2..c8420aaa001 100644 --- a/test/e2e_node/standalone_test.go +++ b/test/e2e_node/standalone_test.go @@ -328,3 +328,81 @@ func decodePods(respBody []byte) (*v1.PodList, error) { return &pods, nil } + +var _ = SIGDescribe(feature.StandaloneMode, framework.WithSerial(), func() { + f := framework.NewDefaultFramework("static-pod-serial") + f.NamespacePodSecurityLevel = admissionapi.LevelBaseline + ginkgo.Context("when creating a static pod and restarting kubelet", func() { + var ns, podPath, staticPodName string + + ginkgo.BeforeEach(func() { + ns = f.Namespace.Name + staticPodName = "static-pod-" + string(uuid.NewUUID()) + podPath = kubeletCfg.StaticPodPath + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By(fmt.Sprintf("delete the static pod (%v/%v)", ns, staticPodName)) + err := deleteStaticPod(podPath, staticPodName, ns) + framework.ExpectNoError(err) + + ginkgo.By(fmt.Sprintf("wait for pod to disappear (%v/%v)", ns, staticPodName)) + gomega.Eventually(ctx, func(ctx context.Context) error { + _, err := getPodFromStandaloneKubelet(ctx, ns, staticPodName) + + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("pod (%v/%v) still exists", ns, staticPodName) + }).Should(gomega.Succeed()) + }) + + ginkgo.It("the pod should be running and kubelet not panic", func(ctx context.Context) { + err := scheduleStaticPod(podPath, staticPodName, ns, createBasicStaticPodSpec(staticPodName, ns)) + framework.ExpectNoError(err) + + ginkgo.By("Waiting for the pod to be running") + gomega.Eventually(ctx, func(ctx context.Context) error { + pod, err := getPodFromStandaloneKubelet(ctx, ns, staticPodName) + if err != nil { + return fmt.Errorf("error getting pod(%v/%v) from standalone kubelet: %w", ns, staticPodName, err) + } + + isReady, err := testutils.PodRunningReady(pod) + if err != nil { + return fmt.Errorf("error checking if pod (%v/%v) is running ready: %w", ns, staticPodName, err) + } + if !isReady { + return fmt.Errorf("pod (%v/%v) is not running", ns, staticPodName) + } + return nil + }, f.Timeouts.PodStart, time.Second*5).Should(gomega.Succeed()) + + ginkgo.By("stopping the kubelet") + restartKubelet := mustStopKubelet(ctx, f) + + ginkgo.By("restarting the kubelet") + restartKubelet(ctx) + + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrueBecause("kubelet should be started")) + + ginkgo.By("ensuring that pod is running") + gomega.Eventually(ctx, func(ctx context.Context) error { + pod, err := getPodFromStandaloneKubelet(ctx, ns, staticPodName) + if err != nil { + return fmt.Errorf("error getting pod(%v/%v) from standalone kubelet: %w", ns, staticPodName, err) + } + isReady, err := testutils.PodRunningReady(pod) + if err != nil { + return fmt.Errorf("error checking if pod (%v/%v) is running ready: %w", ns, staticPodName, err) + } + if !isReady { + return fmt.Errorf("pod (%v/%v) is not running", ns, staticPodName) + } + return nil + }, f.Timeouts.PodStart, time.Second*30).Should(gomega.Succeed()) + }) + }) +})