mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-13 19:01:10 -04:00
DRA upgrade/downgrade: add some debug output for stopping commands
In some (all?) CI jobs the initial kubelet instance keeps running, despite command context cancellation. Not reproducible locally, so additional output was necessary to track down the root cause in CI runs: signal propagation via sudo didn't work for kube-proxy and kubelet, but only for those two and only in the CI. The fix is to change the CI jobs so that they disable the usage of sudo. While at it, simplify by replacing atomic.Pointer with atomic.Boole.
This commit is contained in:
parent
3f997b58ed
commit
f692e4e8f0
3 changed files with 67 additions and 38 deletions
|
|
@ -218,7 +218,7 @@ func testUpgradeDowngrade(tCtx ktesting.TContext) {
|
|||
"FEATURE_GATES": "DynamicResourceAllocation=true,DRADeviceTaintRules=true,DRADeviceTaints=true,DRAExtendedResource=true",
|
||||
// *not* needed because driver will run in "local filesystem" mode (= driver.IsLocal): "ALLOW_PRIVILEGED": "1",
|
||||
}
|
||||
cluster.Start(tCtx, binDir, localUpClusterEnv)
|
||||
cluster.Start(tCtx, fmt.Sprintf("0-initial-%d.%d", major, previousMinor), binDir, localUpClusterEnv)
|
||||
})
|
||||
|
||||
restConfig := cluster.LoadConfig(tCtx)
|
||||
|
|
@ -256,7 +256,7 @@ func testUpgradeDowngrade(tCtx ktesting.TContext) {
|
|||
// We could split this up into first updating the apiserver, then control plane components, then restarting kubelet.
|
||||
// For the purpose of this test here we we primarily care about full before/after comparisons, so not done yet.
|
||||
// TODO
|
||||
restoreOptions := cluster.Modify(tCtx.WithStep(fmt.Sprintf("update to %s", gitVersion)), localupcluster.ModifyOptions{Upgrade: true, BinDir: dir})
|
||||
restoreOptions := cluster.Modify(tCtx.WithStep(fmt.Sprintf("update to %s", gitVersion)), "1-"+gitVersion, localupcluster.ModifyOptions{Upgrade: true, BinDir: dir})
|
||||
|
||||
// The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running.
|
||||
// Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices.
|
||||
|
|
@ -272,7 +272,7 @@ func testUpgradeDowngrade(tCtx ktesting.TContext) {
|
|||
})
|
||||
|
||||
// Roll back.
|
||||
cluster.Modify(tCtx.WithStep("downgrade"), restoreOptions)
|
||||
cluster.Modify(tCtx.WithStep("downgrade"), fmt.Sprintf("2-restored-%d.%d", major, previousMinor), restoreOptions)
|
||||
|
||||
tCtx.Run("after-cluster-downgrade", func(tCtx ktesting.TContext) {
|
||||
for subTest, f := range downgradedTestFuncs {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
type Cmd struct {
|
||||
|
|
@ -73,7 +72,7 @@ type Cmd struct {
|
|||
cancel func(string)
|
||||
cmd *exec.Cmd
|
||||
wg sync.WaitGroup
|
||||
running atomic.Pointer[bool]
|
||||
running atomic.Bool
|
||||
result error
|
||||
gathering bool
|
||||
|
||||
|
|
@ -121,7 +120,7 @@ func (c *Cmd) Start(tCtx ktesting.TContext) {
|
|||
c.cmd.Stderr = writer
|
||||
|
||||
tCtx.ExpectNoError(c.cmd.Start(), "start %s command", c.Name)
|
||||
c.running.Store(ptr.To(true))
|
||||
c.running.Store(true)
|
||||
|
||||
if reader != nil {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
|
|
@ -157,7 +156,9 @@ func (c *Cmd) Start(tCtx ktesting.TContext) {
|
|||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
// tCtx.Logf("Starting to wait for termination of command %s", c.Name)
|
||||
c.result = c.cmd.Wait()
|
||||
tCtx.Logf("Command %s terminated, result: %v", c.Name, c.result)
|
||||
now := time.Now()
|
||||
if reader != nil {
|
||||
// Has to be closed to stop output processing, otherwise the scanner
|
||||
|
|
@ -176,7 +177,7 @@ func (c *Cmd) Start(tCtx ktesting.TContext) {
|
|||
}
|
||||
}
|
||||
}
|
||||
c.running.Store(ptr.To(false))
|
||||
c.running.Store(false)
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
@ -190,6 +191,15 @@ func (c *Cmd) Stop(tCtx ktesting.TContext, reason string) string {
|
|||
// Not started...
|
||||
return ""
|
||||
}
|
||||
if c.LogFile != "" {
|
||||
f, err := os.OpenFile(c.LogFile, os.O_WRONLY|os.O_APPEND, 0666)
|
||||
if err == nil {
|
||||
defer func() {
|
||||
_ = f.Close()
|
||||
}()
|
||||
_, _ = fmt.Fprintf(f, "%s: killing: %s\n", time.Now(), reason)
|
||||
}
|
||||
}
|
||||
c.cancel(reason)
|
||||
return c.wait(tCtx, true)
|
||||
}
|
||||
|
|
@ -197,6 +207,9 @@ func (c *Cmd) Stop(tCtx ktesting.TContext, reason string) string {
|
|||
func (c *Cmd) wait(tCtx ktesting.TContext, killed bool) string {
|
||||
tCtx.Helper()
|
||||
c.wg.Wait()
|
||||
if c.running.Load() {
|
||||
tCtx.Fatalf("command %s should have stopped but didn't", c.Name)
|
||||
}
|
||||
if !killed {
|
||||
tCtx.ExpectNoError(c.result, fmt.Sprintf("%s command failed, output:\n%s", c.Name, c.output.String()))
|
||||
}
|
||||
|
|
@ -204,7 +217,7 @@ func (c *Cmd) wait(tCtx ktesting.TContext, killed bool) string {
|
|||
}
|
||||
|
||||
func (c *Cmd) Running() bool {
|
||||
return ptr.Deref(c.running.Load(), false)
|
||||
return c.running.Load()
|
||||
}
|
||||
|
||||
func (c *Cmd) Output(tCtx ktesting.TContext) string {
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ type Cluster struct {
|
|||
// Kubernetes release. They will be invoked with parameters as defined in the
|
||||
// *current* local-up-cluster.sh. This works as long as local-up-cluster.sh in its
|
||||
// default configuration doesn't depend on something which was added only recently.
|
||||
func (c *Cluster) Start(tCtx ktesting.TContext, bindir string, localUpClusterEnv map[string]string) {
|
||||
func (c *Cluster) Start(tCtx ktesting.TContext, state string, bindir string, localUpClusterEnv map[string]string) {
|
||||
tCtx.Helper()
|
||||
c.Stop(tCtx)
|
||||
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
|
||||
|
|
@ -185,7 +185,7 @@ processLocalUpClusterOutput:
|
|||
c.Stop(tCtx)
|
||||
tCtx.Fatalf("interrupted cluster startup: %v", context.Cause(tCtx))
|
||||
case output := <-lines:
|
||||
if c.processLocalUpClusterOutput(tCtx, output) {
|
||||
if c.processLocalUpClusterOutput(tCtx, state, output) {
|
||||
break processLocalUpClusterOutput
|
||||
}
|
||||
}
|
||||
|
|
@ -196,7 +196,7 @@ processLocalUpClusterOutput:
|
|||
// Matches e.g. "+ API_SECURE_PORT=6443".
|
||||
var varAssignment = regexp.MustCompile(`^\+ ([A-Z0-9_]+)=(.*)$`)
|
||||
|
||||
func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Output) bool {
|
||||
func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, state string, output Output) bool {
|
||||
if output.EOF {
|
||||
if output.Line != "" {
|
||||
tCtx.Fatalf("%s output processing failed: %s", LocalUpCluster, output.Line)
|
||||
|
|
@ -217,7 +217,7 @@ func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Out
|
|||
|
||||
// Cluster components are kept running.
|
||||
if slices.Contains(KubeClusterComponents, KubeComponentName(name)) {
|
||||
c.runKubeComponent(tCtx, KubeComponentName(name), cmdLine)
|
||||
c.runKubeComponent(tCtx, state, KubeComponentName(name), cmdLine)
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
@ -241,19 +241,18 @@ func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Out
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *Cluster) runKubeComponent(tCtx ktesting.TContext, component KubeComponentName, command string) {
|
||||
func (c *Cluster) runKubeComponent(tCtx ktesting.TContext, state string, component KubeComponentName, command string) {
|
||||
commandLine := fromLocalUpClusterOutput(command)
|
||||
|
||||
cmd := &Cmd{
|
||||
Name: string(component),
|
||||
Name: string(component) + "-" + state,
|
||||
CommandLine: commandLine,
|
||||
// Number gets bumped when restarting.
|
||||
LogFile: path.Join(c.dir, fmt.Sprintf("%s-0.log", component)),
|
||||
LogFile: path.Join(c.dir, fmt.Sprintf("%s-%s.log", component, state)),
|
||||
// Stopped via Cluster.Stop.
|
||||
KeepRunning: true,
|
||||
}
|
||||
|
||||
c.runComponentWithRetry(tCtx, cmd)
|
||||
c.runComponentWithRetry(tCtx, component, cmd)
|
||||
}
|
||||
|
||||
func (c *Cluster) runCmd(tCtx ktesting.TContext, name, command string) {
|
||||
|
|
@ -360,9 +359,14 @@ func (m ModifyOptions) GetComponentFile(component KubeComponentName) string {
|
|||
}
|
||||
|
||||
// Modify changes the cluster as described in the options.
|
||||
// It returns options that can be passed to Modify unchanged
|
||||
//
|
||||
// The state description is required. It gets used as
|
||||
// file suffix for log files and in log messages.
|
||||
// It needs to be unique through the life of the cluster.
|
||||
//
|
||||
// The returned options can be passed to Modify unchanged
|
||||
// to restore the original state.
|
||||
func (c *Cluster) Modify(tCtx ktesting.TContext, options ModifyOptions) ModifyOptions {
|
||||
func (c *Cluster) Modify(tCtx ktesting.TContext, state string, options ModifyOptions) ModifyOptions {
|
||||
tCtx.Helper()
|
||||
|
||||
restore := ModifyOptions{
|
||||
|
|
@ -375,12 +379,12 @@ func (c *Cluster) Modify(tCtx ktesting.TContext, options ModifyOptions) ModifyOp
|
|||
slices.Reverse(components)
|
||||
}
|
||||
for _, component := range components {
|
||||
c.modifyComponent(tCtx, options, component, &restore)
|
||||
c.modifyComponent(tCtx, state, options, component, &restore)
|
||||
}
|
||||
return restore
|
||||
}
|
||||
|
||||
func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions, component KubeComponentName, restore *ModifyOptions) {
|
||||
func (c *Cluster) modifyComponent(tCtx ktesting.TContext, state string, options ModifyOptions, component KubeComponentName, restore *ModifyOptions) {
|
||||
tCtx.Helper()
|
||||
tCtx = tCtx.WithStep(fmt.Sprintf("modify %s", component))
|
||||
|
||||
|
|
@ -391,8 +395,11 @@ func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions,
|
|||
if !ok {
|
||||
tCtx.Fatal("not running")
|
||||
}
|
||||
tCtx.Logf("killing command %s before replacing it", cmd.Name)
|
||||
cmd.Stop(tCtx, "modifying the component")
|
||||
delete(c.running, component)
|
||||
tCtx.Logf("command %s with pid %d stopped: %s", cmd.Name, cmd.cmd.ProcessState.Pid(), cmd.cmd.ProcessState)
|
||||
dumpProcesses(tCtx)
|
||||
|
||||
// Find the command (might be wrapped by sudo!).
|
||||
cmdLine := slices.Clone(cmd.CommandLine)
|
||||
|
|
@ -408,21 +415,15 @@ func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions,
|
|||
if !found {
|
||||
tCtx.Fatal("binary filename not found")
|
||||
}
|
||||
cmd.Name = string(component) + "-" + state
|
||||
cmd.CommandLine = cmdLine
|
||||
cmd.LogFile = path.Join(c.dir, fmt.Sprintf("%s-%s.log", component, state))
|
||||
|
||||
// New log file.
|
||||
m := regexp.MustCompile(`^(.*)-([[:digit:]]+)\.log$`).FindStringSubmatch(cmd.LogFile)
|
||||
if m == nil {
|
||||
tCtx.Fatalf("unexpected log file, should have contained number: %s", cmd.LogFile)
|
||||
}
|
||||
logNum, _ := strconv.Atoi(m[2])
|
||||
cmd.LogFile = fmt.Sprintf("%s-%d.log", m[1], logNum+1)
|
||||
|
||||
c.runComponentWithRetry(tCtx, cmd)
|
||||
c.runComponentWithRetry(tCtx, component, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) {
|
||||
func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, component KubeComponentName, cmd *Cmd) {
|
||||
// Sometimes components fail to come up. We have to retry.
|
||||
//
|
||||
// For example, the apiserver's port might not be free again yet (no SO_LINGER!).
|
||||
|
|
@ -434,7 +435,7 @@ func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) {
|
|||
for i := 0; ; i++ {
|
||||
tCtx.Logf("running %s with output redirected to %s", cmd.Name, cmd.LogFile)
|
||||
cmd.Start(tCtx)
|
||||
c.running[KubeComponentName(cmd.Name)] = cmd
|
||||
c.running[component] = cmd
|
||||
err := func() (finalErr error) {
|
||||
tCtx, finalize := tCtx.WithError(&finalErr)
|
||||
defer finalize()
|
||||
|
|
@ -452,6 +453,8 @@ func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) {
|
|||
// Re-raise the failure.
|
||||
tCtx.ExpectNoError(err)
|
||||
}
|
||||
tCtx.Logf("started %s with pid %d", cmd.Name, cmd.cmd.Process.Pid)
|
||||
dumpProcesses(tCtx)
|
||||
}
|
||||
|
||||
func (c *Cluster) checkReadiness(tCtx ktesting.TContext, cmd *Cmd) {
|
||||
|
|
@ -459,16 +462,16 @@ func (c *Cluster) checkReadiness(tCtx ktesting.TContext, cmd *Cmd) {
|
|||
tCtx = tCtx.WithRESTConfig(restConfig)
|
||||
tCtx = tCtx.WithStep(fmt.Sprintf("wait for %s readiness", cmd.Name))
|
||||
|
||||
switch KubeComponentName(cmd.Name) {
|
||||
case KubeAPIServer:
|
||||
switch {
|
||||
case strings.HasPrefix(cmd.Name, string(KubeAPIServer)):
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["API_SECURE_PORT"])
|
||||
case KubeScheduler:
|
||||
case strings.HasPrefix(cmd.Name, string(KubeScheduler)):
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["SCHEDULER_SECURE_PORT"])
|
||||
case KubeControllerManager:
|
||||
case strings.HasPrefix(cmd.Name, string(KubeControllerManager)):
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["KCM_SECURE_PORT"])
|
||||
case KubeProxy:
|
||||
case strings.HasPrefix(cmd.Name, string(KubeProxy)):
|
||||
c.checkHealthz(tCtx, cmd, "http" /* not an error! */, c.settings["API_HOST_IP"], c.settings["PROXY_HEALTHZ_PORT"])
|
||||
case Kubelet:
|
||||
case strings.HasPrefix(cmd.Name, string(Kubelet)):
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["KUBELET_HOST"], c.settings["KUBELET_PORT"])
|
||||
|
||||
// Also wait for the node to be ready.
|
||||
|
|
@ -508,3 +511,16 @@ func (c *Cluster) checkHealthz(tCtx ktesting.TContext, cmd *Cmd, method, hostIP,
|
|||
return nil
|
||||
}).Should(gomega.Succeed(), fmt.Sprintf("HTTP GET %s", url))
|
||||
}
|
||||
|
||||
func dumpProcesses(tCtx ktesting.TContext) {
|
||||
// Uncomment the code to get additional debug output.
|
||||
//
|
||||
// cmd := &Cmd{
|
||||
// Name: "ps",
|
||||
// CommandLine: []string{"ps", "-efww", "--forest"},
|
||||
// GatherOutput: true,
|
||||
// }
|
||||
// cmd.Start(tCtx)
|
||||
// processes := cmd.Wait(tCtx)
|
||||
// tCtx.Log(processes)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue