diff --git a/test/e2e_dra/upgradedowngrade_test.go b/test/e2e_dra/upgradedowngrade_test.go index 76b78001cc4..9999bfc350b 100644 --- a/test/e2e_dra/upgradedowngrade_test.go +++ b/test/e2e_dra/upgradedowngrade_test.go @@ -218,7 +218,7 @@ func testUpgradeDowngrade(tCtx ktesting.TContext) { "FEATURE_GATES": "DynamicResourceAllocation=true,DRADeviceTaintRules=true,DRADeviceTaints=true,DRAExtendedResource=true", // *not* needed because driver will run in "local filesystem" mode (= driver.IsLocal): "ALLOW_PRIVILEGED": "1", } - cluster.Start(tCtx, binDir, localUpClusterEnv) + cluster.Start(tCtx, fmt.Sprintf("0-initial-%d.%d", major, previousMinor), binDir, localUpClusterEnv) }) restConfig := cluster.LoadConfig(tCtx) @@ -256,7 +256,7 @@ func testUpgradeDowngrade(tCtx ktesting.TContext) { // We could split this up into first updating the apiserver, then control plane components, then restarting kubelet. // For the purpose of this test here we we primarily care about full before/after comparisons, so not done yet. // TODO - restoreOptions := cluster.Modify(tCtx.WithStep(fmt.Sprintf("update to %s", gitVersion)), localupcluster.ModifyOptions{Upgrade: true, BinDir: dir}) + restoreOptions := cluster.Modify(tCtx.WithStep(fmt.Sprintf("update to %s", gitVersion)), "1-"+gitVersion, localupcluster.ModifyOptions{Upgrade: true, BinDir: dir}) // The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running. // Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices. @@ -272,7 +272,7 @@ func testUpgradeDowngrade(tCtx ktesting.TContext) { }) // Roll back. - cluster.Modify(tCtx.WithStep("downgrade"), restoreOptions) + cluster.Modify(tCtx.WithStep("downgrade"), fmt.Sprintf("2-restored-%d.%d", major, previousMinor), restoreOptions) tCtx.Run("after-cluster-downgrade", func(tCtx ktesting.TContext) { for subTest, f := range downgradedTestFuncs { diff --git a/test/utils/localupcluster/cmd.go b/test/utils/localupcluster/cmd.go index 3695bcce4fb..4c3452d528e 100644 --- a/test/utils/localupcluster/cmd.go +++ b/test/utils/localupcluster/cmd.go @@ -31,7 +31,6 @@ import ( "time" "k8s.io/kubernetes/test/utils/ktesting" - "k8s.io/utils/ptr" ) type Cmd struct { @@ -73,7 +72,7 @@ type Cmd struct { cancel func(string) cmd *exec.Cmd wg sync.WaitGroup - running atomic.Pointer[bool] + running atomic.Bool result error gathering bool @@ -121,7 +120,7 @@ func (c *Cmd) Start(tCtx ktesting.TContext) { c.cmd.Stderr = writer tCtx.ExpectNoError(c.cmd.Start(), "start %s command", c.Name) - c.running.Store(ptr.To(true)) + c.running.Store(true) if reader != nil { scanner := bufio.NewScanner(reader) @@ -157,7 +156,9 @@ func (c *Cmd) Start(tCtx ktesting.TContext) { c.wg.Add(1) go func() { defer c.wg.Done() + // tCtx.Logf("Starting to wait for termination of command %s", c.Name) c.result = c.cmd.Wait() + tCtx.Logf("Command %s terminated, result: %v", c.Name, c.result) now := time.Now() if reader != nil { // Has to be closed to stop output processing, otherwise the scanner @@ -176,7 +177,7 @@ func (c *Cmd) Start(tCtx ktesting.TContext) { } } } - c.running.Store(ptr.To(false)) + c.running.Store(false) }() } @@ -190,6 +191,15 @@ func (c *Cmd) Stop(tCtx ktesting.TContext, reason string) string { // Not started... return "" } + if c.LogFile != "" { + f, err := os.OpenFile(c.LogFile, os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + defer func() { + _ = f.Close() + }() + _, _ = fmt.Fprintf(f, "%s: killing: %s\n", time.Now(), reason) + } + } c.cancel(reason) return c.wait(tCtx, true) } @@ -197,6 +207,9 @@ func (c *Cmd) Stop(tCtx ktesting.TContext, reason string) string { func (c *Cmd) wait(tCtx ktesting.TContext, killed bool) string { tCtx.Helper() c.wg.Wait() + if c.running.Load() { + tCtx.Fatalf("command %s should have stopped but didn't", c.Name) + } if !killed { tCtx.ExpectNoError(c.result, fmt.Sprintf("%s command failed, output:\n%s", c.Name, c.output.String())) } @@ -204,7 +217,7 @@ func (c *Cmd) wait(tCtx ktesting.TContext, killed bool) string { } func (c *Cmd) Running() bool { - return ptr.Deref(c.running.Load(), false) + return c.running.Load() } func (c *Cmd) Output(tCtx ktesting.TContext) string { diff --git a/test/utils/localupcluster/localupcluster.go b/test/utils/localupcluster/localupcluster.go index a46869ae497..9dd1c45ec5c 100644 --- a/test/utils/localupcluster/localupcluster.go +++ b/test/utils/localupcluster/localupcluster.go @@ -124,7 +124,7 @@ type Cluster struct { // Kubernetes release. They will be invoked with parameters as defined in the // *current* local-up-cluster.sh. This works as long as local-up-cluster.sh in its // default configuration doesn't depend on something which was added only recently. -func (c *Cluster) Start(tCtx ktesting.TContext, bindir string, localUpClusterEnv map[string]string) { +func (c *Cluster) Start(tCtx ktesting.TContext, state string, bindir string, localUpClusterEnv map[string]string) { tCtx.Helper() c.Stop(tCtx) tCtx.CleanupCtx(func(tCtx ktesting.TContext) { @@ -185,7 +185,7 @@ processLocalUpClusterOutput: c.Stop(tCtx) tCtx.Fatalf("interrupted cluster startup: %v", context.Cause(tCtx)) case output := <-lines: - if c.processLocalUpClusterOutput(tCtx, output) { + if c.processLocalUpClusterOutput(tCtx, state, output) { break processLocalUpClusterOutput } } @@ -196,7 +196,7 @@ processLocalUpClusterOutput: // Matches e.g. "+ API_SECURE_PORT=6443". var varAssignment = regexp.MustCompile(`^\+ ([A-Z0-9_]+)=(.*)$`) -func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Output) bool { +func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, state string, output Output) bool { if output.EOF { if output.Line != "" { tCtx.Fatalf("%s output processing failed: %s", LocalUpCluster, output.Line) @@ -217,7 +217,7 @@ func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Out // Cluster components are kept running. if slices.Contains(KubeClusterComponents, KubeComponentName(name)) { - c.runKubeComponent(tCtx, KubeComponentName(name), cmdLine) + c.runKubeComponent(tCtx, state, KubeComponentName(name), cmdLine) return false } @@ -241,19 +241,18 @@ func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Out return false } -func (c *Cluster) runKubeComponent(tCtx ktesting.TContext, component KubeComponentName, command string) { +func (c *Cluster) runKubeComponent(tCtx ktesting.TContext, state string, component KubeComponentName, command string) { commandLine := fromLocalUpClusterOutput(command) cmd := &Cmd{ - Name: string(component), + Name: string(component) + "-" + state, CommandLine: commandLine, - // Number gets bumped when restarting. - LogFile: path.Join(c.dir, fmt.Sprintf("%s-0.log", component)), + LogFile: path.Join(c.dir, fmt.Sprintf("%s-%s.log", component, state)), // Stopped via Cluster.Stop. KeepRunning: true, } - c.runComponentWithRetry(tCtx, cmd) + c.runComponentWithRetry(tCtx, component, cmd) } func (c *Cluster) runCmd(tCtx ktesting.TContext, name, command string) { @@ -360,9 +359,14 @@ func (m ModifyOptions) GetComponentFile(component KubeComponentName) string { } // Modify changes the cluster as described in the options. -// It returns options that can be passed to Modify unchanged +// +// The state description is required. It gets used as +// file suffix for log files and in log messages. +// It needs to be unique through the life of the cluster. +// +// The returned options can be passed to Modify unchanged // to restore the original state. -func (c *Cluster) Modify(tCtx ktesting.TContext, options ModifyOptions) ModifyOptions { +func (c *Cluster) Modify(tCtx ktesting.TContext, state string, options ModifyOptions) ModifyOptions { tCtx.Helper() restore := ModifyOptions{ @@ -375,12 +379,12 @@ func (c *Cluster) Modify(tCtx ktesting.TContext, options ModifyOptions) ModifyOp slices.Reverse(components) } for _, component := range components { - c.modifyComponent(tCtx, options, component, &restore) + c.modifyComponent(tCtx, state, options, component, &restore) } return restore } -func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions, component KubeComponentName, restore *ModifyOptions) { +func (c *Cluster) modifyComponent(tCtx ktesting.TContext, state string, options ModifyOptions, component KubeComponentName, restore *ModifyOptions) { tCtx.Helper() tCtx = tCtx.WithStep(fmt.Sprintf("modify %s", component)) @@ -391,8 +395,11 @@ func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions, if !ok { tCtx.Fatal("not running") } + tCtx.Logf("killing command %s before replacing it", cmd.Name) cmd.Stop(tCtx, "modifying the component") delete(c.running, component) + tCtx.Logf("command %s with pid %d stopped: %s", cmd.Name, cmd.cmd.ProcessState.Pid(), cmd.cmd.ProcessState) + dumpProcesses(tCtx) // Find the command (might be wrapped by sudo!). cmdLine := slices.Clone(cmd.CommandLine) @@ -408,21 +415,15 @@ func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions, if !found { tCtx.Fatal("binary filename not found") } + cmd.Name = string(component) + "-" + state cmd.CommandLine = cmdLine + cmd.LogFile = path.Join(c.dir, fmt.Sprintf("%s-%s.log", component, state)) - // New log file. - m := regexp.MustCompile(`^(.*)-([[:digit:]]+)\.log$`).FindStringSubmatch(cmd.LogFile) - if m == nil { - tCtx.Fatalf("unexpected log file, should have contained number: %s", cmd.LogFile) - } - logNum, _ := strconv.Atoi(m[2]) - cmd.LogFile = fmt.Sprintf("%s-%d.log", m[1], logNum+1) - - c.runComponentWithRetry(tCtx, cmd) + c.runComponentWithRetry(tCtx, component, cmd) } } -func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) { +func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, component KubeComponentName, cmd *Cmd) { // Sometimes components fail to come up. We have to retry. // // For example, the apiserver's port might not be free again yet (no SO_LINGER!). @@ -434,7 +435,7 @@ func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) { for i := 0; ; i++ { tCtx.Logf("running %s with output redirected to %s", cmd.Name, cmd.LogFile) cmd.Start(tCtx) - c.running[KubeComponentName(cmd.Name)] = cmd + c.running[component] = cmd err := func() (finalErr error) { tCtx, finalize := tCtx.WithError(&finalErr) defer finalize() @@ -452,6 +453,8 @@ func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) { // Re-raise the failure. tCtx.ExpectNoError(err) } + tCtx.Logf("started %s with pid %d", cmd.Name, cmd.cmd.Process.Pid) + dumpProcesses(tCtx) } func (c *Cluster) checkReadiness(tCtx ktesting.TContext, cmd *Cmd) { @@ -459,16 +462,16 @@ func (c *Cluster) checkReadiness(tCtx ktesting.TContext, cmd *Cmd) { tCtx = tCtx.WithRESTConfig(restConfig) tCtx = tCtx.WithStep(fmt.Sprintf("wait for %s readiness", cmd.Name)) - switch KubeComponentName(cmd.Name) { - case KubeAPIServer: + switch { + case strings.HasPrefix(cmd.Name, string(KubeAPIServer)): c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["API_SECURE_PORT"]) - case KubeScheduler: + case strings.HasPrefix(cmd.Name, string(KubeScheduler)): c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["SCHEDULER_SECURE_PORT"]) - case KubeControllerManager: + case strings.HasPrefix(cmd.Name, string(KubeControllerManager)): c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["KCM_SECURE_PORT"]) - case KubeProxy: + case strings.HasPrefix(cmd.Name, string(KubeProxy)): c.checkHealthz(tCtx, cmd, "http" /* not an error! */, c.settings["API_HOST_IP"], c.settings["PROXY_HEALTHZ_PORT"]) - case Kubelet: + case strings.HasPrefix(cmd.Name, string(Kubelet)): c.checkHealthz(tCtx, cmd, "https", c.settings["KUBELET_HOST"], c.settings["KUBELET_PORT"]) // Also wait for the node to be ready. @@ -508,3 +511,16 @@ func (c *Cluster) checkHealthz(tCtx ktesting.TContext, cmd *Cmd, method, hostIP, return nil }).Should(gomega.Succeed(), fmt.Sprintf("HTTP GET %s", url)) } + +func dumpProcesses(tCtx ktesting.TContext) { + // Uncomment the code to get additional debug output. + // + // cmd := &Cmd{ + // Name: "ps", + // CommandLine: []string{"ps", "-efww", "--forest"}, + // GatherOutput: true, + // } + // cmd.Start(tCtx) + // processes := cmd.Wait(tCtx) + // tCtx.Log(processes) +}