Merge pull request #137003 from hoteye/pr1-kubelet-podworkers-core

kubelet: tighten podworkers context flow and consolidate kubelet logcheck scope
This commit is contained in:
Kubernetes Prow Robot 2026-03-04 04:30:27 +05:30 committed by GitHub
commit ba4ac6d65b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 349 additions and 424 deletions

View file

@ -345,50 +345,13 @@ linters:
contextual k8s.io/kubernetes/pkg/securitycontext/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/test/images/sample-device-plugin/.*
contextual k8s.io/kubernetes/pkg/kubelet/allocation/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/checkpointmanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/client/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/.*
contextual k8s.io/kubernetes/pkg/kubelet/config/.*
contextual k8s.io/kubernetes/pkg/kubelet/configmap/.*
contextual k8s.io/kubernetes/pkg/kubelet/container/.*
contextual k8s.io/kubernetes/pkg/kubelet/envvars/.*
contextual k8s.io/kubernetes/pkg/kubelet/events/.*
contextual k8s.io/kubernetes/pkg/kubelet/eviction/.*
contextual k8s.io/kubernetes/pkg/kubelet/images/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*
contextual k8s.io/kubernetes/pkg/kubelet/logs/.*
contextual k8s.io/kubernetes/pkg/kubelet/metrics/.*
contextual k8s.io/kubernetes/pkg/kubelet/network/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodestatus/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/pluginmanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/podcertificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/prober/.*
contextual k8s.io/kubernetes/pkg/kubelet/qos/.*
contextual k8s.io/kubernetes/pkg/kubelet/runtimeclass/.*
contextual k8s.io/kubernetes/pkg/kubelet/secret/.*
contextual k8s.io/kubernetes/pkg/kubelet/server/.*
contextual k8s.io/kubernetes/pkg/kubelet/stats/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/types/.*
contextual k8s.io/kubernetes/pkg/kubelet/userns/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/watchdog/.*
contextual k8s.io/kubernetes/pkg/kubelet/winstats/.*
contextual k8s.io/kubernetes/pkg/kubelet/.*
# Temporary carve-outs for PR1 split safety: these files still have legacy global klog calls.
# TODO: remove each exclusion as follow-up PRs migrate the file to contextual logging.
-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_resources.go
-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_volumes.go
-contextual k8s.io/kubernetes/pkg/kubelet/pod_container_deletor.go
-contextual k8s.io/kubernetes/pkg/kubelet/volume_host.go
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View file

@ -358,50 +358,13 @@ linters:
contextual k8s.io/kubernetes/pkg/securitycontext/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/test/images/sample-device-plugin/.*
contextual k8s.io/kubernetes/pkg/kubelet/allocation/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/checkpointmanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/client/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/.*
contextual k8s.io/kubernetes/pkg/kubelet/config/.*
contextual k8s.io/kubernetes/pkg/kubelet/configmap/.*
contextual k8s.io/kubernetes/pkg/kubelet/container/.*
contextual k8s.io/kubernetes/pkg/kubelet/envvars/.*
contextual k8s.io/kubernetes/pkg/kubelet/events/.*
contextual k8s.io/kubernetes/pkg/kubelet/eviction/.*
contextual k8s.io/kubernetes/pkg/kubelet/images/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*
contextual k8s.io/kubernetes/pkg/kubelet/logs/.*
contextual k8s.io/kubernetes/pkg/kubelet/metrics/.*
contextual k8s.io/kubernetes/pkg/kubelet/network/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodestatus/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/pluginmanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/podcertificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/prober/.*
contextual k8s.io/kubernetes/pkg/kubelet/qos/.*
contextual k8s.io/kubernetes/pkg/kubelet/runtimeclass/.*
contextual k8s.io/kubernetes/pkg/kubelet/secret/.*
contextual k8s.io/kubernetes/pkg/kubelet/server/.*
contextual k8s.io/kubernetes/pkg/kubelet/stats/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/types/.*
contextual k8s.io/kubernetes/pkg/kubelet/userns/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/watchdog/.*
contextual k8s.io/kubernetes/pkg/kubelet/winstats/.*
contextual k8s.io/kubernetes/pkg/kubelet/.*
# Temporary carve-outs for PR1 split safety: these files still have legacy global klog calls.
# TODO: remove each exclusion as follow-up PRs migrate the file to contextual logging.
-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_resources.go
-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_volumes.go
-contextual k8s.io/kubernetes/pkg/kubelet/pod_container_deletor.go
-contextual k8s.io/kubernetes/pkg/kubelet/volume_host.go
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View file

@ -64,50 +64,13 @@ contextual k8s.io/kubernetes/pkg/security/.*
contextual k8s.io/kubernetes/pkg/securitycontext/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/test/images/sample-device-plugin/.*
contextual k8s.io/kubernetes/pkg/kubelet/allocation/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/checkpointmanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/client/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/.*
contextual k8s.io/kubernetes/pkg/kubelet/config/.*
contextual k8s.io/kubernetes/pkg/kubelet/configmap/.*
contextual k8s.io/kubernetes/pkg/kubelet/container/.*
contextual k8s.io/kubernetes/pkg/kubelet/envvars/.*
contextual k8s.io/kubernetes/pkg/kubelet/events/.*
contextual k8s.io/kubernetes/pkg/kubelet/eviction/.*
contextual k8s.io/kubernetes/pkg/kubelet/images/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*
contextual k8s.io/kubernetes/pkg/kubelet/logs/.*
contextual k8s.io/kubernetes/pkg/kubelet/metrics/.*
contextual k8s.io/kubernetes/pkg/kubelet/network/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodestatus/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/pluginmanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/podcertificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/prober/.*
contextual k8s.io/kubernetes/pkg/kubelet/qos/.*
contextual k8s.io/kubernetes/pkg/kubelet/runtimeclass/.*
contextual k8s.io/kubernetes/pkg/kubelet/secret/.*
contextual k8s.io/kubernetes/pkg/kubelet/server/.*
contextual k8s.io/kubernetes/pkg/kubelet/stats/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/types/.*
contextual k8s.io/kubernetes/pkg/kubelet/userns/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/watchdog/.*
contextual k8s.io/kubernetes/pkg/kubelet/winstats/.*
contextual k8s.io/kubernetes/pkg/kubelet/.*
# Temporary carve-outs for PR1 split safety: these files still have legacy global klog calls.
# TODO: remove each exclusion as follow-up PRs migrate the file to contextual logging.
-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_resources.go
-contextual k8s.io/kubernetes/pkg/kubelet/kubelet_volumes.go
-contextual k8s.io/kubernetes/pkg/kubelet/pod_container_deletor.go
-contextual k8s.io/kubernetes/pkg/kubelet/volume_host.go
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View file

@ -1018,7 +1018,7 @@ func NewMainKubelet(ctx context.Context,
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig,
killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
killPodNow(ctx, klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
klet.evictionManager = evictionManager
handlers := []lifecycle.PodAdmitHandler{}
@ -1058,7 +1058,7 @@ func NewMainKubelet(ctx context.Context,
handlers = append(handlers, klet.containerManager.GetAllocateResourcesPodAdmitHandler())
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getAllocatedPods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getAllocatedPods, killPodNow(ctx, klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
handlers = append(handlers, lifecycle.NewPredicateAdmitHandler(klet.GetCachedNode, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's
for _, opt := range kubeDeps.Options {
@ -1097,7 +1097,7 @@ func NewMainKubelet(ctx context.Context,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
GetPodsFunc: klet.GetActivePods,
KillPodFunc: killPodNow(klet.podWorkers, kubeDeps.Recorder),
KillPodFunc: killPodNow(ctx, klet.podWorkers, kubeDeps.Recorder),
SyncNodeStatusFunc: klet.syncNodeStatus,
ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration,
ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
@ -2451,7 +2451,7 @@ func (kl *Kubelet) getPodsToSync() []*v1.Pod {
//
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(logger klog.Logger, pod *v1.Pod) error {
func (kl *Kubelet) deletePod(ctx context.Context, logger klog.Logger, pod *v1.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
@ -2461,7 +2461,7 @@ func (kl *Kubelet) deletePod(logger klog.Logger, pod *v1.Pod) error {
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
logger.V(3).Info("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
@ -2629,7 +2629,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
kl.cleanUpContainersInPod(ctx, e.ID, containerID)
}
}
case <-syncCh:
@ -2734,7 +2734,7 @@ func (kl *Kubelet) HandlePodAdditions(ctx context.Context, pods []*v1.Pod) {
logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
@ -2774,7 +2774,7 @@ func (kl *Kubelet) HandlePodAdditions(ctx context.Context, pods []*v1.Pod) {
}
}
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate,
@ -2846,7 +2846,7 @@ func (kl *Kubelet) HandlePodUpdates(ctx context.Context, pods []*v1.Pod) {
}
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
@ -2955,7 +2955,7 @@ func (kl *Kubelet) HandlePodRemoves(ctx context.Context, pods []*v1.Pod) {
logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
@ -2966,7 +2966,7 @@ func (kl *Kubelet) HandlePodRemoves(ctx context.Context, pods []*v1.Pod) {
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(logger, pod); err != nil {
if err := kl.deletePod(ctx, logger, pod); err != nil {
logger.V(2).Info("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
}
@ -3041,7 +3041,7 @@ func (kl *Kubelet) HandlePodReconcile(ctx context.Context, pods []*v1.Pod) {
// be different than Sync, or if there is a better place for it. For instance, we have
// needsReconcile in kubelet/config, here, and in status_manager.
if status.NeedToReconcilePodReadiness(pod) {
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
@ -3086,7 +3086,7 @@ func (kl *Kubelet) HandlePodSyncs(ctx context.Context, pods []*v1.Pod) {
logger.V(3).Info("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
@ -3234,7 +3234,7 @@ func (kl *Kubelet) ListenAndServePodResources(ctx context.Context) {
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
func (kl *Kubelet) cleanUpContainersInPod(ctx context.Context, podID types.UID, exitedContainerID string) {
if podStatus, err := kl.podCache.Get(podID); err == nil {
// When an evicted or deleted pod has already synced, all containers can be removed.
removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID)

View file

@ -1232,7 +1232,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// Stop the workers for terminated pods not in the config source
logger.V(3).Info("Clean up pod workers for terminated pods")
workingPods := kl.podWorkers.SyncKnownPods(allPods)
workingPods := kl.podWorkers.SyncKnownPods(logger, allPods)
// Reconcile: At this point the pod workers have been pruned to the set of
// desired pods. Pods that must be restarted due to UID reuse, or leftover
@ -1357,7 +1357,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
logger.V(2).Info("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: pod,
MirrorPod: mirrorPod,
@ -1386,7 +1386,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// next invocation of HandlePodCleanups.
for _, pod := range kl.filterTerminalPodsToDelete(allPods, runningRuntimePods, workingPods) {
logger.V(3).Info("Handling termination and deletion of the pod to pod workers", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
Pod: pod,
})
@ -1409,7 +1409,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
PodTerminationGracePeriodSecondsOverride: &one,
}
logger.V(2).Info("Clean up containers for orphaned pod we had not seen before", "podUID", runningPod.ID, "killPodOptions", killPodOptions)
kl.podWorkers.UpdatePod(UpdatePodOptions{
kl.podWorkers.UpdatePod(ctx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
RunningPod: runningPod,
KillPodOptions: killPodOptions,

View file

@ -49,7 +49,6 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
"k8s.io/kubelet/pkg/cri/streaming/remotecommand"
_ "k8s.io/kubernetes/pkg/apis/core/install"
@ -6450,7 +6449,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
prepareWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) {
// send a create
pod := simplePod()
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -6475,7 +6474,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
},
},
}
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
StartTime: time.Unix(3, 0).UTC(),
Pod: updatedPod,
@ -6851,7 +6850,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
},
},
}
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -6874,7 +6873,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
},
},
}
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
StartTime: time.Unix(3, 0).UTC(),
Pod: updatedPod,
@ -6923,7 +6922,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
// block startup of the static pod due to full name collision
w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2")
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -6996,7 +6995,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
// block startup of the static pod due to full name collision
w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2")
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -7059,7 +7058,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
// block startup of the static pod due to full name collision
w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2")
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -7104,7 +7103,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
// send a create of a static pod
pod := staticPod()
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -7112,14 +7111,14 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
drainAllWorkers(w)
// terminate the pod (which won't complete) and then deliver a recreate by that same UID
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
StartTime: time.Unix(2, 0).UTC(),
Pod: pod,
})
pod = staticPod()
pod.Annotations["version"] = "2"
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(3, 0).UTC(),
Pod: pod,
@ -7213,7 +7212,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
// send a create of a static pod
pod := staticPod()
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -7256,7 +7255,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
// send a create of a static pod
pod := staticPod()
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
@ -7343,19 +7342,19 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
prepareWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) {
// simulate a delete and recreate of the static pod
pod := simplePod()
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
})
drainAllWorkers(w)
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
Pod: pod,
})
pod2 := simplePod()
pod2.Annotations = map[string]string{"version": "2"}
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: pod2,
})
@ -7444,7 +7443,8 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
podWorkers, _, processed := createPodWorkers()
logger, _ := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
kl.podWorkers = podWorkers
originalPodSyncer := podWorkers.podSyncer
syncFuncs := newPodSyncerFuncs(originalPodSyncer)
@ -7517,8 +7517,7 @@ func testMetric(t *testing.T, metricName string, expectedMetric string) {
}
func TestGetNonExistentImagePullSecret(t *testing.T) {
_, tCtx := ktesting.NewTestContext(t)
logger := klog.FromContext(tCtx)
logger, _ := ktesting.NewTestContext(t)
secrets := make([]*v1.Secret, 0)
fakeRecorder := record.NewFakeRecorder(1)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)

View file

@ -148,7 +148,8 @@ type fakeImageGCManager struct {
}
func (f *fakeImageGCManager) GetImageList() ([]kubecontainer.Image, error) {
return f.fakeImageService.ListImages(context.Background())
// ImageGCManager interface does not accept a context parameter.
return f.fakeImageService.ListImages(context.TODO())
}
type TestKubelet struct {
@ -390,7 +391,7 @@ func newTestKubeletWithImageList(
}
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{},
killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())
killPodNow(tCtx, kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())
kubelet.evictionManager = evictionManager
handlers := []lifecycle.PodAdmitHandler{}
@ -403,7 +404,7 @@ func newTestKubeletWithImageList(
Recorder: fakeRecorder,
NodeRef: nodeRef,
GetPodsFunc: kubelet.podManager.GetPods,
KillPodFunc: killPodNow(kubelet.podWorkers, fakeRecorder),
KillPodFunc: killPodNow(tCtx, kubelet.podWorkers, fakeRecorder),
SyncNodeStatusFunc: func(context.Context) {},
ShutdownGracePeriodRequested: 0,
ShutdownGracePeriodCriticalPods: 0,
@ -481,7 +482,7 @@ func newTestPods(count int) []*v1.Pod {
}
func TestSyncLoopAbort(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -494,15 +495,15 @@ func TestSyncLoopAbort(t *testing.T) {
close(ch)
// sanity check (also prevent this test from hanging in the next step)
ok := kubelet.syncLoopIteration(ctx, ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1))
ok := kubelet.syncLoopIteration(tCtx, ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1))
require.False(t, ok, "Expected syncLoopIteration to return !ok since update chan was closed")
// this should terminate immediately; if it hangs then the syncLoopIteration isn't aborting properly
kubelet.syncLoop(ctx, ch, kubelet)
kubelet.syncLoop(tCtx, ch, kubelet)
}
func TestSyncPodsStartPod(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -515,12 +516,12 @@ func TestSyncPodsStartPod(t *testing.T) {
}),
}
kubelet.podManager.SetPods(pods)
kubelet.HandlePodSyncs(ctx, pods)
kubelet.HandlePodSyncs(tCtx, pods)
fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
}
func TestHandlePodCleanupsPerQOS(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -548,7 +549,7 @@ func TestHandlePodCleanupsPerQOS(t *testing.T) {
// within a goroutine so a two second delay should be enough time to
// mark the pod as killed (within this test case).
kubelet.HandlePodCleanups(ctx)
require.NoError(t, kubelet.HandlePodCleanups(tCtx))
// assert that unwanted pods were killed
if actual, expected := kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion, []types.UID{"12345678"}; !reflect.DeepEqual(actual, expected) {
@ -559,9 +560,9 @@ func TestHandlePodCleanupsPerQOS(t *testing.T) {
// simulate Runtime.KillPod
fakeRuntime.PodList = nil
kubelet.HandlePodCleanups(ctx)
kubelet.HandlePodCleanups(ctx)
kubelet.HandlePodCleanups(ctx)
require.NoError(t, kubelet.HandlePodCleanups(tCtx))
require.NoError(t, kubelet.HandlePodCleanups(tCtx))
require.NoError(t, kubelet.HandlePodCleanups(tCtx))
destroyCount := 0
err := wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
@ -585,6 +586,7 @@ func TestHandlePodCleanupsPerQOS(t *testing.T) {
}
func TestDispatchWorkOfCompletedPod(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -655,7 +657,7 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
},
}
for _, pod := range pods {
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
kubelet.podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodSync,
StartTime: time.Now(),
@ -668,6 +670,7 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
}
func TestDispatchWorkOfActivePod(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -713,7 +716,7 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
}
for _, pod := range pods {
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
kubelet.podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodSync,
StartTime: time.Now(),
@ -755,7 +758,7 @@ func TestHandlePodCleanups(t *testing.T) {
}
func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
const podCount = 500
tk := newTestKubelet(t, true /* controller-attach-detach enabled */)
defer tk.Cleanup()
@ -784,11 +787,11 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
pods, _ := newTestPodsWithResources(podCount)
kl.podManager.SetPods(pods)
kl.HandlePodSyncs(ctx, pods)
kl.HandlePodSyncs(tCtx, pods)
// all pods must reach a terminal, Failed state due to VolumeAttachmentLimitExceeded.
if err := wait.PollUntilContextTimeout(
ctx, 200*time.Millisecond, 30*time.Second, true,
tCtx, 200*time.Millisecond, 30*time.Second, true,
func(ctx context.Context) (bool, error) {
for _, p := range pods {
st, ok := kl.statusManager.GetPodStatus(p.UID)
@ -803,7 +806,7 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
// validate that SyncTerminatedPod completed successfully for each pod.
if err := wait.PollUntilContextTimeout(
ctx, 200*time.Millisecond, 30*time.Second, true,
tCtx, 200*time.Millisecond, 30*time.Second, true,
func(ctx context.Context) (bool, error) {
for _, p := range pods {
if !kl.podWorkers.ShouldPodBeFinished(p.UID) {
@ -856,7 +859,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
ready := false
@ -883,7 +886,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
kubelet := testKubelet.kubelet
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.Set[string]) bool { return ready })
kubelet.HandlePodRemoves(ctx, pods)
kubelet.HandlePodRemoves(tCtx, pods)
time.Sleep(2 * time.Second)
// Sources are not ready yet. Don't remove any pods.
@ -892,7 +895,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
}
ready = true
kubelet.HandlePodRemoves(ctx, pods)
kubelet.HandlePodRemoves(tCtx, pods)
time.Sleep(2 * time.Second)
// Sources are ready. Remove unwanted pods.
@ -927,7 +930,7 @@ func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) {
// Tests that we handle port conflicts correctly by setting the failed status in status map.
func TestHandlePortConflicts(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -969,7 +972,7 @@ func TestHandlePortConflicts(t *testing.T) {
pods[1].UID: true,
}
kl.HandlePodAdditions(ctx, pods)
kl.HandlePodAdditions(tCtx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -978,7 +981,7 @@ func TestHandlePortConflicts(t *testing.T) {
// Tests that we handle host name conflicts correctly by setting the failed status in status map.
func TestHandleHostNameConflicts(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1013,7 +1016,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(ctx, pods)
kl.HandlePodAdditions(tCtx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -1022,7 +1025,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
// Tests that we handle not matching labels selector correctly by setting the failed status in status map.
func TestHandleNodeSelector(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1056,7 +1059,7 @@ func TestHandleNodeSelector(t *testing.T) {
notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(ctx, pods)
kl.HandlePodAdditions(tCtx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -1093,7 +1096,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1121,7 +1124,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
pod := podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: test.podSelector})
kl.HandlePodAdditions(ctx, []*v1.Pod{pod})
kl.HandlePodAdditions(tCtx, []*v1.Pod{pod})
// Check pod status stored in the status map.
checkPodStatus(t, kl, pod, test.podStatus)
@ -1131,7 +1134,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
// Tests that we handle exceeded resources correctly by setting the failed status in status map.
func TestHandleMemExceeded(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1177,7 +1180,7 @@ func TestHandleMemExceeded(t *testing.T) {
pods[1].UID: true,
}
kl.HandlePodAdditions(ctx, pods)
kl.HandlePodAdditions(tCtx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -1187,7 +1190,7 @@ func TestHandleMemExceeded(t *testing.T) {
// Tests that we handle result of interface UpdatePluginResources correctly
// by setting corresponding status in status map.
func TestHandlePluginResources(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, false /*enableResizing*/)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1309,7 +1312,7 @@ func TestHandlePluginResources(t *testing.T) {
missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec)
failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec)
kl.HandlePodAdditions(ctx, []*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
kl.HandlePodAdditions(tCtx, []*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
// Check pod status stored in the status map.
checkPodStatus(t, kl, fittingPod, v1.PodPending)
@ -1320,7 +1323,7 @@ func TestHandlePluginResources(t *testing.T) {
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1331,13 +1334,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
}
podToTest := pods[1]
// Run once to populate the status map.
kl.HandlePodAdditions(ctx, pods)
kl.HandlePodAdditions(tCtx, pods)
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
t.Fatalf("expected to have status cached for pod2")
}
// Sync with empty pods so that the entry in status map will be removed.
kl.podManager.SetPods([]*v1.Pod{})
kl.HandlePodCleanups(ctx)
require.NoError(t, kl.HandlePodCleanups(tCtx))
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found {
t.Fatalf("expected to not have status cached for pod2")
}
@ -1835,7 +1838,7 @@ func TestCheckpointContainer(t *testing.T) {
}
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
fakeRuntime := testKubelet.fakeRuntime
@ -1876,7 +1879,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}
// Let the pod worker sets the status to fail after this sync.
kubelet.HandlePodUpdates(ctx, pods)
kubelet.HandlePodUpdates(tCtx, pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
assert.Equal(t, v1.PodFailed, status.Phase)
@ -1885,7 +1888,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}
func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
fakeRuntime := testKubelet.fakeRuntime
@ -1927,7 +1930,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
}
kubelet.podManager.SetPods(pods)
kubelet.HandlePodUpdates(ctx, pods)
kubelet.HandlePodUpdates(tCtx, pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
assert.NotEqual(t, v1.PodFailed, status.Phase)
@ -1951,7 +1954,7 @@ func podWithUIDNameNsSpec(uid types.UID, name, namespace string, spec v1.PodSpec
}
func TestDeletePodDirsForDeletedPods(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1962,26 +1965,26 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
kl.podManager.SetPods(pods)
// Sync to create pod directories.
kl.HandlePodSyncs(ctx, kl.podManager.GetPods())
kl.HandlePodSyncs(tCtx, kl.podManager.GetPods())
for i := range pods {
assert.True(t, dirExists(kl.getPodDir(pods[i].UID)), "Expected directory to exist for pod %d", i)
}
// Pod 1 has been deleted and no longer exists.
kl.podManager.SetPods([]*v1.Pod{pods[0]})
kl.HandlePodCleanups(ctx)
require.NoError(t, kl.HandlePodCleanups(tCtx))
assert.True(t, dirExists(kl.getPodDir(pods[0].UID)), "Expected directory to exist for pod 0")
assert.False(t, dirExists(kl.getPodDir(pods[1].UID)), "Expected directory to be deleted for pod 1")
}
func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod, podsToCheck []*v1.Pod, shouldExist bool) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
t.Helper()
kl := testKubelet.kubelet
kl.podManager.SetPods(pods)
kl.HandlePodSyncs(ctx, pods)
kl.HandlePodCleanups(ctx)
kl.HandlePodSyncs(tCtx, pods)
require.NoError(t, kl.HandlePodCleanups(tCtx))
for i, pod := range podsToCheck {
exist := dirExists(kl.getPodDir(pod.UID))
assert.Equal(t, shouldExist, exist, "directory of pod %d", i)
@ -2714,7 +2717,7 @@ func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecyc
// Test verifies that the kubelet invokes an admission handler during HandlePodAdditions.
func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -2751,7 +2754,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
kl.allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{&testPodAdmitHandler{podsToReject: podsToReject}})
kl.HandlePodAdditions(ctx, pods)
kl.HandlePodAdditions(tCtx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, podToReject, v1.PodFailed)
@ -2954,7 +2957,7 @@ func TestPodResourceAllocationReset(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
if tc.existingPodAllocation != nil {
// when kubelet restarts, AllocatedResources has already existed before adding pod
err := kubelet.allocationManager.SetAllocatedResources(tc.existingPodAllocation)
@ -2962,7 +2965,7 @@ func TestPodResourceAllocationReset(t *testing.T) {
t.Fatalf("failed to set pod allocation: %v", err)
}
}
kubelet.HandlePodAdditions(ctx, []*v1.Pod{tc.pod})
kubelet.HandlePodAdditions(tCtx, []*v1.Pod{tc.pod})
allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(tc.pod.UID, tc.pod.Spec.Containers[0].Name)
if !found {
@ -3046,6 +3049,7 @@ func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
}
func TestSyncTerminatingPodKillPod(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -3060,7 +3064,7 @@ func TestSyncTerminatingPodKillPod(t *testing.T) {
kl.podManager.SetPods(pods)
podStatus := &kubecontainer.PodStatus{ID: pod.UID}
gracePeriodOverride := int64(0)
err := kl.SyncTerminatingPod(context.Background(), pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
err := kl.SyncTerminatingPod(tCtx, pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
podStatus.Phase = v1.PodFailed
podStatus.Reason = "reason"
podStatus.Message = "message"
@ -3072,7 +3076,6 @@ func TestSyncTerminatingPodKillPod(t *testing.T) {
}
func TestSyncLabels(t *testing.T) {
tCtx := ktesting.Init(t)
tests := []struct {
name string
existingNode *v1.Node
@ -3102,6 +3105,7 @@ func TestSyncLabels(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -3366,8 +3370,8 @@ func TestNewMainKubeletStandAlone(t *testing.T) {
assert.NotNil(t, testMainKubelet, "testMainKubelet should not be nil")
testMainKubelet.BirthCry()
ctx := ktesting.Init(t)
testMainKubelet.StartGarbageCollection(ctx)
tCtx = ktesting.Init(t)
testMainKubelet.StartGarbageCollection(tCtx)
// Nil pointer panic can be reproduced if configmap manager is not nil.
// See https://github.com/kubernetes/kubernetes/issues/113492
// pod := &v1.Pod{
@ -3492,7 +3496,7 @@ func TestSyncPodSpans(t *testing.T) {
EnableServiceLinks: ptr.To(false),
})
_, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{})
_, err = kubelet.SyncPod(tCtx, kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{})
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans())
@ -3825,6 +3829,7 @@ func TestCrashLoopBackOffConfiguration(t *testing.T) {
}
func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -3917,7 +3922,7 @@ func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
testKubelet.fakeRuntime.SyncResults = tc.syncResults
testKubelet.fakeRuntime.PodResizeInProgress = tc.podResizeInProgress
isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kubelet.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
require.False(t, isTerminal)
if tc.expectedErr == "" {
require.NoError(t, err)
@ -3951,7 +3956,7 @@ func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) {
}
func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
metrics.Register()
metrics.ContainerRequestedResizes.Reset()
@ -4533,7 +4538,7 @@ func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) {
kubelet.podManager.AddPod(initialPod)
require.NoError(t, kubelet.allocationManager.SetAllocatedResources(initialPod))
kubelet.HandlePodUpdates(ctx, []*v1.Pod{updatedPod})
kubelet.HandlePodUpdates(tCtx, []*v1.Pod{updatedPod})
tc.updateExpectedFunc(&expectedMetrics)
@ -4596,7 +4601,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
t.Skip("InPlacePodVerticalScaling is not currently supported for Windows")
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, true /*enableResizing*/)
defer testKubelet.Cleanup()
@ -4729,7 +4734,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
kubelet.allocationManager.PushPendingResize(pendingResizeDesired.UID)
kubelet.statusManager.ClearPodResizePendingCondition(pendingResizeDesired.UID)
kubelet.HandlePodReconcile(ctx, []*v1.Pod{tc.newPod})
kubelet.HandlePodReconcile(tCtx, []*v1.Pod{tc.newPod})
require.Equal(t, tc.shouldRetryPendingResize, kubelet.statusManager.IsPodResizeDeferred(pendingResizeDesired.UID))
kubelet.allocationManager.RemovePod(pendingResizeDesired.UID)
@ -4740,7 +4745,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
}
func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) {
ctx := ktesting.Init(t)
tCtx := ktesting.Init(t)
cpu1000m := resource.MustParse("1")
mem1000M := resource.MustParse("1Gi")
cpu2000m := resource.MustParse("2")
@ -4861,7 +4866,7 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger := klog.FromContext(ctx)
logger := klog.FromContext(tCtx)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeDeclaredFeatures, tc.featureGateEnabled)
testKubelet := newTestKubelet(t, false)
@ -4884,7 +4889,7 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) {
}
kubelet.statusManager.SetPodStatus(logger, tc.newPod, v1.PodStatus{Phase: v1.PodRunning})
kubelet.HandlePodUpdates(ctx, []*v1.Pod{tc.newPod})
kubelet.HandlePodUpdates(tCtx, []*v1.Pod{tc.newPod})
if tc.expectEvent {
select {
case event := <-recorder.Events:

View file

@ -164,7 +164,7 @@ type PodWorkers interface {
// UpdatePod() calls will be ignored for that pod until it has been forgotten
// due to significant time passing. A pod that is terminated will never be
// restarted.
UpdatePod(options UpdatePodOptions)
UpdatePod(ctx context.Context, options UpdatePodOptions)
// SyncKnownPods removes workers for pods that are not in the desiredPods set
// and have been terminated for a significant period of time. Once this method
// has been called once, the workers are assumed to be fully initialized and
@ -172,7 +172,7 @@ type PodWorkers interface {
// true. It returns a map describing the state of each known pod worker. It
// is the responsibility of the caller to re-add any desired pods that are not
// returned as knownPods.
SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync)
SyncKnownPods(logger klog.Logger, desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync)
// IsPodKnownTerminated returns true once SyncTerminatingPod completes
// successfully - the provided pod UID it is known by the pod
@ -335,11 +335,6 @@ const (
// podSyncStatus tracks per-pod transitions through the three phases of pod
// worker sync (setup, terminating, terminated).
type podSyncStatus struct {
// ctx is the context that is associated with the current pod sync.
// TODO: remove this from the struct by having the context initialized
// in startPodSync, the cancelFn used by UpdatePod, and cancellation of
// a parent context for tearing down workers (if needed) on shutdown
ctx context.Context
// cancelFn if set is expected to cancel the current podSyncer operation.
cancelFn context.CancelFunc
@ -752,7 +747,8 @@ func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
// UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
// terminating, or terminated, and will transition to terminating if: deleted on the apiserver,
// discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet.
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
func (p *podWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) {
logger := klog.FromContext(ctx)
// Handle when the pod is an orphan (no config) and we only have runtime status by running only
// the terminating part of the lifecycle. A running pod contains only a minimal set of information
// about the pod
@ -763,7 +759,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
if options.Pod == nil {
// the sythetic pod created here is used only as a placeholder and not tracked
if options.UpdateType != kubetypes.SyncPodKill {
klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType)
logger.Info("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType)
return
}
uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name
@ -771,12 +767,13 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
} else {
options.RunningPod = nil
uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
logger.Info("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
}
} else {
uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
}
updateLogger := logger.WithValues("pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
p.podLock.Lock()
defer p.podLock.Unlock()
@ -785,7 +782,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
now := p.clock.Now()
status, ok := p.podSyncStatuses[uid]
if !ok {
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is being synced for the first time")
firstTime = true
status = &podSyncStatus{
syncedAt: now,
@ -850,14 +847,14 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
if !firstTime && status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is terminating but has been requested to restart with same UID, will be reconciled later")
return
}
}
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is finished processing, no further updates")
return
}
@ -866,25 +863,25 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
if !status.IsTerminationRequested() {
switch {
case isRuntimePod:
klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is orphaned and must be torn down")
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.DeletionTimestamp != nil:
klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is marked for graceful deletion, begin teardown")
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is in a terminal phase (success/failed), begin teardown")
status.terminatingAt = now
becameTerminating = true
case options.UpdateType == kubetypes.SyncPodKill:
if options.KillPodOptions != nil && options.KillPodOptions.Evict {
klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is being evicted by the kubelet, begin teardown")
status.evicted = true
} else {
klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(4).Info("Pod is being removed by the kubelet, begin teardown")
}
status.terminatingAt = now
becameTerminating = true
@ -899,7 +896,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
// after the pod worker completes.
if isRuntimePod {
klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
updateLogger.V(3).Info("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated")
return
}
@ -966,8 +963,8 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// TODO: this should be a wait.Until with backoff to handle panics, and
// accept a context for shutdown
defer runtime.HandleCrash()
defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
p.podWorkerLoop(uid, outCh)
defer logger.V(3).Info("Pod worker has stopped", "podUID", uid)
p.podWorkerLoop(ctx, uid, outCh)
}()
}
@ -983,14 +980,14 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
status.pendingUpdate.Pod, _ = p.allocationManager.UpdatePodFromAllocation(options.Pod)
}
status.working = true
klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
updateLogger.V(4).Info("Notifying pod of pending update", "workType", status.WorkType())
select {
case podUpdates <- struct{}{}:
default:
}
if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
updateLogger.V(3).Info("Cancelling current pod sync", "workType", status.WorkType())
status.cancelFn()
return
}
@ -1035,7 +1032,7 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *
// it requeues the pod and returns false. If the pod will never be able to start
// because data is missing, or the pod was terminated before start, canEverStart
// is false. This method can only be called while holding the pod lock.
func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) {
func (p *podWorkers) allowPodStart(logger klog.Logger, pod *v1.Pod) (canStart bool, canEverStart bool) {
if !kubetypes.IsStaticPod(pod) {
// TODO: Do we want to allow non-static pods with the same full name?
// Note that it may disable the force deletion of pods.
@ -1043,7 +1040,7 @@ func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart boo
}
status, ok := p.podSyncStatuses[pod.UID]
if !ok {
klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.Error(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
return false, false
}
if status.IsTerminationRequested() {
@ -1094,14 +1091,14 @@ func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
// cleanupUnstartedPod is invoked if a pod that has never been started receives a termination
// signal before it can be started. This method must be called holding the pod lock.
func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) {
func (p *podWorkers) cleanupUnstartedPod(logger klog.Logger, pod *v1.Pod, status *podSyncStatus) {
p.cleanupPodUpdates(pod.UID)
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
if !status.terminatedAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.finished = true
status.working = false
@ -1121,7 +1118,7 @@ func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) {
// This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate,
// or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started
// should never have an activeUpdate because that is exposed to downstream components on started pods.
func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) {
func (p *podWorkers) startPodSync(parentCtx context.Context, podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) {
p.podLock.Lock()
defer p.podLock.Unlock()
@ -1129,18 +1126,18 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update
status, ok := p.podSyncStatuses[podUID]
if !ok {
// pod status has disappeared, the worker should exit
klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID)
return nil, update, false, false, false
}
logger := klog.FromContext(parentCtx)
if !status.working {
// working is used by unit tests to observe whether a worker is currently acting on this pod
klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID)
logger.V(4).Info("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID)
}
if status.pendingUpdate == nil {
// no update available, this means we were queued without work being added or there is a
// race condition, both of which are unexpected
status.working = false
klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID)
logger.V(4).Info("Pod worker received no pending work, programmer error?", "podUID", podUID)
return nil, update, false, false, false
}
@ -1154,11 +1151,7 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update
default:
}
// initialize a context for the worker if one does not exist
if status.ctx == nil || status.ctx.Err() == context.Canceled {
status.ctx, status.cancelFn = context.WithCancel(context.Background())
}
ctx = status.ctx
ctx, status.cancelFn = context.WithCancel(parentCtx)
// if we are already started, make our state visible to downstream components
if status.IsStarted() {
@ -1178,26 +1171,26 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update
// asked to start such a pod, but guard here just in case an accident occurs.
if update.Options.Pod == nil {
status.mergeLastUpdate(update.Options)
klog.V(4).InfoS("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
logger.V(4).Info("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
return ctx, update, false, false, true
}
// verify we can start
canStart, canEverStart = p.allowPodStart(update.Options.Pod)
canStart, canEverStart = p.allowPodStart(logger, update.Options.Pod)
switch {
case !canEverStart:
p.cleanupUnstartedPod(update.Options.Pod, status)
p.cleanupUnstartedPod(logger, update.Options.Pod, status)
status.working = false
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
logger.V(4).Info("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
return ctx, update, canStart, canEverStart, true
case !canStart:
// this is the only path we don't start the pod, so we need to put the change back in pendingUpdate
status.pendingUpdate = &update.Options
status.working = false
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID)
logger.V(4).Info("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID)
return ctx, update, canStart, canEverStart, true
}
@ -1234,14 +1227,15 @@ func podUIDAndRefForUpdate(update UpdatePodOptions) (types.UID, klog.ObjectRef)
// to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the
// caller. When a pod transitions working->terminating or terminating->terminated, the next update is
// queued immediately and no kubelet action is required.
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
func (p *podWorkers) podWorkerLoop(parentCtx context.Context, podUID types.UID, podUpdates <-chan struct{}) {
var lastSyncTime time.Time
for range podUpdates {
ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
ctx, update, canStart, canEverStart, ok := p.startPodSync(parentCtx, podUID)
// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
if !ok {
continue
}
logger := klog.FromContext(ctx)
// If the pod was terminated prior to the pod being allowed to start, we exit the loop.
if !canEverStart {
return
@ -1253,7 +1247,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
podUID, podRef := podUIDAndRefForUpdate(update.Options)
klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
logger.V(4).Info("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
var isTerminal bool
err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate
@ -1296,7 +1290,7 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
if opt := update.Options.KillPodOptions; opt != nil {
gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
}
podStatusFn := p.acknowledgeTerminating(podUID)
podStatusFn := p.acknowledgeTerminating(logger, podUID)
// if we only have a running pod, terminate it directly
if update.Options.RunningPod != nil {
@ -1317,55 +1311,55 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
switch {
case err == context.Canceled:
// when the context is cancelled we expect an update to already be queued
klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
logger.V(2).Info("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
case err != nil:
// we will queue a retry
klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)
logger.Error(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)
case update.WorkType == TerminatedPod:
// we can shut down the worker
p.completeTerminated(podUID)
p.completeTerminated(logger, podUID)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
logger.V(4).Info("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
return
case update.WorkType == TerminatingPod:
// pods that don't exist in config don't need to be terminated, other loops will clean them up
if update.Options.RunningPod != nil {
p.completeTerminatingRuntimePod(podUID)
p.completeTerminatingRuntimePod(logger, podUID)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
logger.V(4).Info("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
return
}
// otherwise we move to the terminating phase
p.completeTerminating(podUID)
p.completeTerminating(logger, podUID)
phaseTransition = true
case isTerminal:
// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
p.completeSync(podUID)
logger.V(4).Info("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
p.completeSync(logger, podUID)
phaseTransition = true
}
// queue a retry if necessary, then put the next event in the channel if any
p.completeWork(podUID, phaseTransition, err)
p.completeWork(logger, podUID, phaseTransition, err)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
logger.V(4).Info("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
}
}
// acknowledgeTerminating sets the terminating flag on the pod status once the pod worker sees
// the termination state so that other components know no new containers will be started in this
// pod. It then returns the status function, if any, that applies to this pod.
func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc {
func (p *podWorkers) acknowledgeTerminating(logger klog.Logger, podUID types.UID) PodStatusFunc {
p.podLock.Lock()
defer p.podLock.Unlock()
@ -1375,7 +1369,7 @@ func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc {
}
if !status.terminatingAt.IsZero() && !status.startedTerminating {
klog.V(4).InfoS("Pod worker has observed request to terminate", "podUID", podUID)
logger.V(4).Info("Pod worker has observed request to terminate", "podUID", podUID)
status.startedTerminating = true
}
@ -1389,15 +1383,15 @@ func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc {
// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways
// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by
// UpdatePod.
func (p *podWorkers) completeSync(podUID types.UID) {
func (p *podWorkers) completeSync(logger klog.Logger, podUID types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID)
logger.V(4).Info("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID)
status, ok := p.podSyncStatuses[podUID]
if !ok {
klog.V(4).InfoS("Pod had no status in completeSync, programmer error?", "podUID", podUID)
logger.V(4).Info("Pod had no status in completeSync, programmer error?", "podUID", podUID)
return
}
@ -1405,7 +1399,7 @@ func (p *podWorkers) completeSync(podUID types.UID) {
if status.terminatingAt.IsZero() {
status.terminatingAt = p.clock.Now()
} else {
klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID)
logger.V(4).Info("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID)
}
status.startedTerminating = true
@ -1418,11 +1412,11 @@ func (p *podWorkers) completeSync(podUID types.UID) {
// no container is running, no container will be started in the future, and we are ready for
// cleanup. This updates the termination state which prevents future syncs and will ensure
// other kubelet loops know this pod is not running any containers.
func (p *podWorkers) completeTerminating(podUID types.UID) {
func (p *podWorkers) completeTerminating(logger klog.Logger, podUID types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID)
logger.V(4).Info("Pod terminated all containers successfully", "podUID", podUID)
status, ok := p.podSyncStatuses[podUID]
if !ok {
@ -1431,7 +1425,7 @@ func (p *podWorkers) completeTerminating(podUID types.UID) {
// update the status of the pod
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
logger.V(4).Info("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
}
status.terminatedAt = p.clock.Now()
for _, ch := range status.notifyPostTerminating {
@ -1449,11 +1443,11 @@ func (p *podWorkers) completeTerminating(podUID types.UID) {
// which means an orphaned pod (no config) is terminated and we can exit. Since orphaned
// pods have no API representation, we want to exit the loop at this point and ensure no
// status is present afterwards - the running pod is truly terminated when this is invoked.
func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) {
func (p *podWorkers) completeTerminatingRuntimePod(logger klog.Logger, podUID types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID)
logger.V(4).Info("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID)
p.cleanupPodUpdates(podUID)
@ -1462,7 +1456,7 @@ func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) {
return
}
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
logger.V(4).Info("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
}
status.terminatedAt = p.clock.Now()
status.finished = true
@ -1479,11 +1473,11 @@ func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) {
// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
// can stop the pod worker. The pod is finalized at this point.
func (p *podWorkers) completeTerminated(podUID types.UID) {
func (p *podWorkers) completeTerminated(logger klog.Logger, podUID types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod is complete and the worker can now stop", "podUID", podUID)
logger.V(4).Info("Pod is complete and the worker can now stop", "podUID", podUID)
p.cleanupPodUpdates(podUID)
@ -1492,10 +1486,10 @@ func (p *podWorkers) completeTerminated(podUID types.UID) {
return
}
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID)
logger.V(4).Info("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID)
}
if status.terminatedAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID)
logger.V(4).Info("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID)
}
status.finished = true
status.working = false
@ -1507,7 +1501,7 @@ func (p *podWorkers) completeTerminated(podUID types.UID) {
// completeWork requeues on error or the next sync interval and then immediately executes any pending
// work.
func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncErr error) {
func (p *podWorkers) completeWork(logger klog.Logger, podUID types.UID, phaseTransition bool, syncErr error) {
// Requeue the last update if the last sync returned error.
switch {
case phaseTransition:
@ -1541,9 +1535,9 @@ func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncEr
if status.pendingUpdate != nil {
select {
case p.podUpdates[podUID] <- struct{}{}:
klog.V(4).InfoS("Requeuing pod due to pending update", "podUID", podUID)
logger.V(4).Info("Requeuing pod due to pending update", "podUID", podUID)
default:
klog.V(4).InfoS("Pending update already queued", "podUID", podUID)
logger.V(4).Info("Pending update already queued", "podUID", podUID)
}
} else {
status.working = false
@ -1559,7 +1553,7 @@ func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncEr
// of known workers that are not finished with a value of SyncPodTerminated,
// SyncPodKill, or SyncPodSync depending on whether the pod is terminated, terminating,
// or syncing.
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
func (p *podWorkers) SyncKnownPods(logger klog.Logger, desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
workers := make(map[types.UID]PodWorkerSync)
known := make(map[types.UID]struct{})
for _, pod := range desiredPods {
@ -1588,7 +1582,7 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke
_, knownPod := known[uid]
orphan := !knownPod
if status.restartRequested || orphan {
if p.removeTerminatedWorker(uid, status, orphan) {
if p.removeTerminatedWorker(logger, uid, status, orphan) {
// no worker running, we won't return it
continue
}
@ -1622,11 +1616,11 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke
// terminated pods to prevent accidentally restarting a terminal pod, which is
// proportional to the number of pods described in the pod config. The method
// returns true if the worker was completely removed.
func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus, orphaned bool) bool {
func (p *podWorkers) removeTerminatedWorker(logger klog.Logger, uid types.UID, status *podSyncStatus, orphaned bool) bool {
if !status.finished {
// If the pod worker has not reached terminal state and the pod is still known, we wait.
if !orphaned {
klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
logger.V(4).Info("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
return false
}
@ -1640,7 +1634,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus
case !status.IsStarted() && !status.observedRuntime:
// The pod has not been started, which means we can safely clean up the pod - the
// pod worker will shutdown as a result of this change without executing a sync.
klog.V(4).InfoS("Pod is orphaned and has not been started", "podUID", uid)
logger.V(4).Info("Pod is orphaned and has not been started", "podUID", uid)
case !status.IsTerminationRequested():
// The pod has been started but termination has not been requested - set the appropriate
// timestamp and notify the pod worker. Because the pod has been synced at least once,
@ -1652,22 +1646,22 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus
status.gracePeriod = 1
}
p.requeueLastPodUpdate(uid, status)
klog.V(4).InfoS("Pod is orphaned and still running, began terminating", "podUID", uid)
logger.V(4).Info("Pod is orphaned and still running, began terminating", "podUID", uid)
return false
default:
// The pod is already moving towards termination, notify the pod worker. Because the pod
// has been synced at least once, the value of status.activeUpdate will be the fallback for
// the next sync.
p.requeueLastPodUpdate(uid, status)
klog.V(4).InfoS("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid)
logger.V(4).Info("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid)
return false
}
}
if status.restartRequested {
klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
logger.V(4).Info("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
} else {
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
logger.V(4).Info("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
}
delete(p.podSyncStatuses, uid)
p.cleanupPodUpdates(uid)
@ -1680,7 +1674,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus
// killPodNow returns a KillPodFunc that can be used to kill a pod.
// It is intended to be injected into other modules that need to kill a pod.
func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
func killPodNow(ctx context.Context, podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error {
// determine the grace period to use when killing the pod
gracePeriod := int64(0)
@ -1701,16 +1695,18 @@ func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.K
// open a channel we block against until we get a result
ch := make(chan struct{}, 1)
podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
KillPodOptions: &KillPodOptions{
CompletedCh: ch,
Evict: isEvicted,
PodStatusFunc: statusFn,
PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
},
})
podWorkers.UpdatePod(
ctx,
UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
KillPodOptions: &KillPodOptions{
CompletedCh: ch,
Evict: isEvicted,
PodStatusFunc: statusFn,
PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
},
})
// wait for either a response, or a timeout
select {

View file

@ -34,11 +34,13 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/allocation"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/clock"
clocktesting "k8s.io/utils/clock/testing"
)
@ -65,7 +67,7 @@ type fakePodWorkers struct {
terminatingStaticPods map[string]bool
}
func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
func (f *fakePodWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions) {
f.lock.Lock()
defer f.lock.Unlock()
var uid types.UID
@ -85,7 +87,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
case kubetypes.SyncPodKill:
f.triggeredDeletion = append(f.triggeredDeletion, uid)
default:
isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status)
isTerminal, err := f.syncPodFn(ctx, options.UpdateType, options.Pod, options.MirrorPod, status)
if err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
@ -95,7 +97,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
}
}
func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
func (f *fakePodWorkers) SyncKnownPods(_ klog.Logger, desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
return map[types.UID]PodWorkerSync{}
}
@ -260,11 +262,11 @@ type timeIncrementingWorkers struct {
// UpdatePod increments the clock after UpdatePod is called, but before the workers
// are invoked, and then drains all workers before returning. The provided functions
// are invoked while holding the lock to prevent workers from receiving updates.
func (w *timeIncrementingWorkers) UpdatePod(options UpdatePodOptions, afterFns ...func()) {
func (w *timeIncrementingWorkers) UpdatePod(ctx context.Context, options UpdatePodOptions, afterFns ...func()) {
func() {
w.lock.Lock()
defer w.lock.Unlock()
w.w.UpdatePod(options)
w.w.UpdatePod(ctx, options)
w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
for _, fn := range afterFns {
fn()
@ -275,11 +277,11 @@ func (w *timeIncrementingWorkers) UpdatePod(options UpdatePodOptions, afterFns .
// SyncKnownPods increments the clock after SyncKnownPods is called, but before the workers
// are invoked, and then drains all workers before returning.
func (w *timeIncrementingWorkers) SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) {
func (w *timeIncrementingWorkers) SyncKnownPods(logger klog.Logger, desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) {
func() {
w.lock.Lock()
defer w.lock.Unlock()
knownPods = w.w.SyncKnownPods(desiredPods)
knownPods = w.w.SyncKnownPods(logger, desiredPods)
w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
}()
w.drainUnpausedWorkers()
@ -368,8 +370,8 @@ func (w *timeIncrementingWorkers) tick() {
// createTimeIncrementingPodWorkers will guarantee that each call to UpdatePod and each worker goroutine invocation advances the clock by one second,
// although multiple workers will advance the clock in an unpredictable order. Use to observe
// successive internal updates to each update pod state when only a single pod is being updated.
func createTimeIncrementingPodWorkers() (*timeIncrementingWorkers, map[types.UID][]syncPodRecord) {
nested, runtime, processed := createPodWorkers()
func createTimeIncrementingPodWorkers(logger klog.Logger) (*timeIncrementingWorkers, map[types.UID][]syncPodRecord) {
nested, runtime, processed := createPodWorkers(logger)
w := &timeIncrementingWorkers{
w: nested,
runtime: runtime,
@ -391,7 +393,11 @@ func createTimeIncrementingPodWorkers() (*timeIncrementingWorkers, map[types.UID
return w, processed
}
func createPodWorkers() (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) {
func createPodWorkers(logger klog.Logger) (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) {
return createPodWorkersWithLogger(logger)
}
func createPodWorkersWithLogger(_ klog.Logger) (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) {
lock := sync.Mutex{}
processed := make(map[types.UID][]syncPodRecord)
fakeRecorder := &record.FakeRecorder{}
@ -523,12 +529,13 @@ func drainAllWorkers(podWorkers *podWorkers) {
}
func TestUpdatePodParallel(t *testing.T) {
podWorkers, _, processed := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
numPods := 20
for i := 0; i < numPods; i++ {
for j := i; j < numPods; j++ {
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod(strconv.Itoa(j), "ns", strconv.Itoa(i), false),
UpdateType: kubetypes.SyncPodCreate,
})
@ -557,8 +564,8 @@ func TestUpdatePodParallel(t *testing.T) {
func TestUpdatePod(t *testing.T) {
one := int64(1)
hasContext := func(status *podSyncStatus) *podSyncStatus {
status.ctx, status.cancelFn = context.Background(), func() {}
hasCancelFn := func(status *podSyncStatus) *podSyncStatus {
status.cancelFn = func() {}
return status
}
withLabel := func(pod *v1.Pod, label, value string) *v1.Pod {
@ -580,11 +587,16 @@ func TestUpdatePod(t *testing.T) {
t.Helper()
// handle special non-comparable fields
if status != nil {
if e, a := expected.ctx != nil, status.ctx != nil; e != a {
t.Errorf("expected context %t, has context %t", e, a)
} else {
expected.ctx, status.ctx = nil, nil
clearLogger := func(opts *UpdatePodOptions) {
if opts == nil {
return
}
}
clearLogger(expected.pendingUpdate)
clearLogger(expected.activeUpdate)
clearLogger(status.pendingUpdate)
clearLogger(status.activeUpdate)
if e, a := expected.cancelFn != nil, status.cancelFn != nil; e != a {
t.Errorf("expected cancelFn %t, has cancelFn %t", e, a)
} else {
@ -599,7 +611,7 @@ func TestUpdatePod(t *testing.T) {
name string
update UpdatePodOptions
runtimeStatus *kubecontainer.PodStatus
prepare func(t *testing.T, w *timeIncrementingWorkers) (afterUpdateFn func())
prepare func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) (afterUpdateFn func())
expect *podSyncStatus
expectBeforeWorker *podSyncStatus
@ -611,7 +623,7 @@ func TestUpdatePod(t *testing.T) {
UpdateType: kubetypes.SyncPodCreate,
Pod: newNamedPod("1", "ns", "running-pod", false),
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "running-pod_ns",
syncedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
@ -626,19 +638,19 @@ func TestUpdatePod(t *testing.T) {
UpdateType: kubetypes.SyncPodCreate,
Pod: withLabel(newNamedPod("1", "ns", "running-pod", false), "updated", "value"),
},
prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
w.UpdatePod(UpdatePodOptions{
prepare: func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) func() {
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newNamedPod("1", "ns", "running-pod", false),
})
w.PauseWorkers("1")
w.UpdatePod(UpdatePodOptions{
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
Pod: newNamedPod("1", "ns", "running-pod", false),
})
return func() { w.ReleaseWorkersUnderLock("1") }
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "running-pod_ns",
syncedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
@ -662,7 +674,7 @@ func TestUpdatePod(t *testing.T) {
Pod: newNamedPod("1", "ns", "running-pod", false),
RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "running-pod_ns",
syncedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
@ -677,14 +689,14 @@ func TestUpdatePod(t *testing.T) {
UpdateType: kubetypes.SyncPodUpdate,
Pod: withDeletionTimestamp(newNamedPod("1", "ns", "running-pod", false), time.Unix(1, 0), intp(15)),
},
prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
w.UpdatePod(UpdatePodOptions{
prepare: func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) func() {
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newNamedPod("1", "ns", "running-pod", false),
})
return nil
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "running-pod_ns",
syncedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
@ -708,14 +720,14 @@ func TestUpdatePod(t *testing.T) {
Pod: newNamedPod("1", "ns", "running-pod", false),
KillPodOptions: &KillPodOptions{Evict: true},
},
prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
w.UpdatePod(UpdatePodOptions{
prepare: func(t *testing.T, tCtx context.Context, w *timeIncrementingWorkers) func() {
w.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newNamedPod("1", "ns", "running-pod", false),
})
return nil
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "running-pod_ns",
syncedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
@ -741,7 +753,7 @@ func TestUpdatePod(t *testing.T) {
UpdateType: kubetypes.SyncPodCreate,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "done-pod_ns",
syncedAt: time.Unix(1, 0),
terminatingAt: time.Unix(1, 0),
@ -777,7 +789,7 @@ func TestUpdatePod(t *testing.T) {
startedTerminating: true,
working: true,
},
expect: hasContext(&podSyncStatus{
expect: hasCancelFn(&podSyncStatus{
fullname: "done-pod_ns",
syncedAt: time.Unix(1, 0),
terminatingAt: time.Unix(1, 0),
@ -842,7 +854,8 @@ func TestUpdatePod(t *testing.T) {
var fns []func()
podWorkers, _ := createTimeIncrementingPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _ := createTimeIncrementingPodWorkers(logger)
if tc.expectBeforeWorker != nil {
fns = append(fns, func() {
@ -851,7 +864,7 @@ func TestUpdatePod(t *testing.T) {
}
if tc.prepare != nil {
if fn := tc.prepare(t, podWorkers); fn != nil {
if fn := tc.prepare(t, tCtx, podWorkers); fn != nil {
fns = append(fns, fn)
}
}
@ -870,7 +883,7 @@ func TestUpdatePod(t *testing.T) {
podWorkers.runtime.Err = nil
})
podWorkers.UpdatePod(tc.update, fns...)
podWorkers.UpdatePod(tCtx, tc.update, fns...)
if podWorkers.w.IsPodKnownTerminated(uid) != tc.expectKnownTerminated {
t.Errorf("podWorker.IsPodKnownTerminated expected to be %t", tc.expectKnownTerminated)
@ -996,7 +1009,8 @@ func TestCompleteWork_Enqueue(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podWorkers, _, _ := createPodWorkers()
logger, _ := ktesting.NewTestContext(t)
podWorkers, _, _ := createPodWorkers(logger)
podWorkers.clock = clock
fakeQueue := podWorkers.workQueue.(*fakeQueue)
podUID := types.UID("12345")
@ -1004,7 +1018,7 @@ func TestCompleteWork_Enqueue(t *testing.T) {
podWorkers.resyncInterval = resyncInterval
podWorkers.backOffPeriod = defaultBackoff
podWorkers.podSyncStatuses[podUID] = &podSyncStatus{}
podWorkers.completeWork(podUID, tc.phaseTransition, tc.syncErr)
podWorkers.completeWork(logger, podUID, tc.phaseTransition, tc.syncErr)
if fakeQueue.Empty() {
t.Fatalf("work queue should not be empty")
@ -1030,13 +1044,14 @@ func TestCompleteWork_PendingUpdate(t *testing.T) {
podUID := types.UID("pod-with-pending-update-check")
t.Run("with nil pendingUpdate, clears working status", func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
p := &podWorkers{
podSyncStatuses: make(map[types.UID]*podSyncStatus),
workQueue: &fakeQueue{},
}
p.podSyncStatuses[podUID] = &podSyncStatus{working: true, pendingUpdate: nil}
p.completeWork(podUID, false, nil)
p.completeWork(logger, podUID, false, nil)
p.podLock.Lock()
defer p.podLock.Unlock()
@ -1046,6 +1061,7 @@ func TestCompleteWork_PendingUpdate(t *testing.T) {
})
t.Run("with non-nil pendingUpdate, queues an update signal", func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
p := &podWorkers{
podSyncStatuses: make(map[types.UID]*podSyncStatus),
podUpdates: make(map[types.UID]chan struct{}),
@ -1058,7 +1074,7 @@ func TestCompleteWork_PendingUpdate(t *testing.T) {
}
p.podSyncStatuses[podUID] = &podSyncStatus{working: true, pendingUpdate: dummyUpdate}
p.completeWork(podUID, false, nil)
p.completeWork(logger, podUID, false, nil)
select {
case <-p.podUpdates[podUID]:
@ -1070,10 +1086,11 @@ func TestCompleteWork_PendingUpdate(t *testing.T) {
}
func TestUpdatePodForRuntimePod(t *testing.T) {
podWorkers, _, processed := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
// ignores running pod of wrong sync type
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
})
@ -1083,7 +1100,7 @@ func TestUpdatePodForRuntimePod(t *testing.T) {
}
// creates synthetic pod
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
})
@ -1101,7 +1118,8 @@ func TestUpdatePodForRuntimePod(t *testing.T) {
}
func TestUpdatePodForTerminatedRuntimePod(t *testing.T) {
podWorkers, _, processed := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
now := time.Now()
podWorkers.podSyncStatuses[types.UID("1")] = &podSyncStatus{
@ -1112,7 +1130,7 @@ func TestUpdatePodForTerminatedRuntimePod(t *testing.T) {
}
// creates synthetic pod
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
})
@ -1127,19 +1145,20 @@ func TestUpdatePodForTerminatedRuntimePod(t *testing.T) {
}
func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
podWorkers, _, processed := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
numPods := 20
for i := 0; i < numPods; i++ {
pod := newNamedPod(strconv.Itoa(i), "ns", strconv.Itoa(i), false)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodCreate,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1212,14 +1231,15 @@ func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync {
}
func TestTerminalPhaseTransition(t *testing.T) {
podWorkers, _, _ := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, _ := createPodWorkers(logger)
var channels WorkChannel
podWorkers.workerChannelFn = channels.Intercept
terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.podSyncer.(*podSyncerFuncs).syncPod)
podWorkers.podSyncer.(*podSyncerFuncs).syncPod = terminalPhaseSyncer.SyncPod
// start pod
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1232,7 +1252,7 @@ func TestTerminalPhaseTransition(t *testing.T) {
}
// send another update to the pod
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1246,7 +1266,7 @@ func TestTerminalPhaseTransition(t *testing.T) {
// the next sync should result in a transition to terminal
terminalPhaseSyncer.SetTerminal(types.UID("1"))
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1264,7 +1284,8 @@ func TestStaticPodExclusion(t *testing.T) {
t.Skip("skipping test in short mode.")
}
podWorkers, _, processed := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
var channels WorkChannel
podWorkers.workerChannelFn = channels.Intercept
@ -1274,11 +1295,11 @@ func TestStaticPodExclusion(t *testing.T) {
}
// start two pods with the same name, one static, one apiserver
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("1-normal", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("2-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1307,11 +1328,11 @@ func TestStaticPodExclusion(t *testing.T) {
}
// attempt to start a second and third static pod, which should not start
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("3-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("4-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1366,7 +1387,7 @@ func TestStaticPodExclusion(t *testing.T) {
// send a basic update for 3-static
podWorkers.workQueue.GetWork()
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("3-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1386,7 +1407,7 @@ func TestStaticPodExclusion(t *testing.T) {
// mark 3-static as deleted while 2-static is still running
podWorkers.workQueue.GetWork()
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("3-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
@ -1411,7 +1432,7 @@ func TestStaticPodExclusion(t *testing.T) {
}
// terminate 2-static
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("2-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
@ -1430,7 +1451,7 @@ func TestStaticPodExclusion(t *testing.T) {
}
// simulate a periodic event from the work queue for 4-static
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("4-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1449,7 +1470,7 @@ func TestStaticPodExclusion(t *testing.T) {
}
// initiate a sync with all pods remaining
state := podWorkers.SyncKnownPods([]*v1.Pod{
state := podWorkers.SyncKnownPods(logger, []*v1.Pod{
newNamedPod("1-normal", "test1", "pod1", false),
newNamedPod("2-static", "test1", "pod1", true),
newNamedPod("3-static", "test1", "pod1", true),
@ -1472,7 +1493,7 @@ func TestStaticPodExclusion(t *testing.T) {
}
// initiate a sync with 3-static removed
state = podWorkers.SyncKnownPods([]*v1.Pod{
state = podWorkers.SyncKnownPods(logger, []*v1.Pod{
newNamedPod("1-normal", "test1", "pod1", false),
newNamedPod("2-static", "test1", "pod1", true),
newNamedPod("4-static", "test1", "pod1", true),
@ -1493,18 +1514,18 @@ func TestStaticPodExclusion(t *testing.T) {
// start a static pod, kill it, then add another one, but ensure the pod worker
// for pod 5 doesn't see the kill event (so it remains waiting to start)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("5-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
// Wait for the previous work to be delivered to the worker
drainAllWorkers(podWorkers)
channels.Channel("5-static").Hold()
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("5-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1523,12 +1544,12 @@ func TestStaticPodExclusion(t *testing.T) {
}
// terminate 4-static and wake 6-static
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("4-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainWorkersExcept(podWorkers, "5-static")
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1563,30 +1584,30 @@ func TestStaticPodExclusion(t *testing.T) {
// start three more static pods, kill the previous static pod blocking start,
// and simulate the second pod of three (8) getting to run first
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("7-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("8-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("9-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodCreate,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("8-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1622,12 +1643,12 @@ func TestStaticPodExclusion(t *testing.T) {
}
// terminate 7-static and wake 8-static
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("7-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod("8-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1641,7 +1662,7 @@ func TestStaticPodExclusion(t *testing.T) {
}
// initiate a sync with all but 8-static pods undesired
state = podWorkers.SyncKnownPods([]*v1.Pod{
state = podWorkers.SyncKnownPods(logger, []*v1.Pod{
newNamedPod("8-static", "test1", "pod1", true),
})
drainAllWorkers(podWorkers)
@ -1740,11 +1761,12 @@ func (w *WorkChannel) Intercept(uid types.UID, ch chan struct{}) (outCh <-chan s
}
func TestSyncKnownPods(t *testing.T) {
podWorkers, _, _ := createPodWorkers()
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, _ := createPodWorkers(logger)
numPods := 20
for i := 0; i < numPods; i++ {
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: newNamedPod(strconv.Itoa(i), "ns", "name", false),
UpdateType: kubetypes.SyncPodUpdate,
})
@ -1770,7 +1792,7 @@ func TestSyncKnownPods(t *testing.T) {
now := metav1.Now()
pod.DeletionTimestamp = &now
}
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
@ -1823,7 +1845,7 @@ func TestSyncKnownPods(t *testing.T) {
t.Errorf("Expected pod to not be suitable for removal (does not exist but not yet synced)")
}
podWorkers.SyncKnownPods(desiredPodList)
podWorkers.SyncKnownPods(logger, desiredPodList)
if len(podWorkers.podUpdates) != 2 {
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
}
@ -1849,7 +1871,7 @@ func TestSyncKnownPods(t *testing.T) {
// verify workers that are not terminated stay open even if config no longer
// sees them
podWorkers.SyncKnownPods(nil)
podWorkers.SyncKnownPods(logger, nil)
drainAllWorkers(podWorkers)
if len(podWorkers.podUpdates) != 0 {
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
@ -1860,7 +1882,7 @@ func TestSyncKnownPods(t *testing.T) {
for uid := range desiredPods {
pod := newNamedPod(string(uid), "ns", "name", false)
podWorkers.UpdatePod(UpdatePodOptions{
podWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
@ -1868,7 +1890,7 @@ func TestSyncKnownPods(t *testing.T) {
drainWorkers(podWorkers, numPods)
// verify once those pods terminate (via some other flow) the workers are cleared
podWorkers.SyncKnownPods(nil)
podWorkers.SyncKnownPods(logger, nil)
if len(podWorkers.podUpdates) != 0 {
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
}
@ -2047,7 +2069,16 @@ func Test_removeTerminatedWorker(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
podWorkers, _, _ := createPodWorkers()
normalizeUpdatePodOptions := func(opts *UpdatePodOptions) *UpdatePodOptions {
if opts == nil {
return nil
}
normalized := *opts
return &normalized
}
logger, _ := ktesting.NewTestContext(t)
podWorkers, _, _ := createPodWorkers(logger)
podWorkers.podSyncStatuses[podUID] = tc.podSyncStatus
podWorkers.podUpdates[podUID] = make(chan struct{}, 1)
if tc.podSyncStatus.working {
@ -2056,7 +2087,7 @@ func Test_removeTerminatedWorker(t *testing.T) {
podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
podWorkers.removeTerminatedWorker(podUID, podWorkers.podSyncStatuses[podUID], tc.orphan)
podWorkers.removeTerminatedWorker(logger, podUID, podWorkers.podSyncStatuses[podUID], tc.orphan)
status, exists := podWorkers.podSyncStatuses[podUID]
if tc.removed && exists {
t.Fatalf("Expected pod worker to be removed")
@ -2070,8 +2101,10 @@ func Test_removeTerminatedWorker(t *testing.T) {
if tc.expectGracePeriod > 0 && status.gracePeriod != tc.expectGracePeriod {
t.Errorf("Unexpected grace period %d", status.gracePeriod)
}
if !reflect.DeepEqual(tc.expectPending, status.pendingUpdate) {
t.Errorf("Unexpected pending: %s", cmp.Diff(tc.expectPending, status.pendingUpdate))
expectedPending := normalizeUpdatePodOptions(tc.expectPending)
actualPending := normalizeUpdatePodOptions(status.pendingUpdate)
if !reflect.DeepEqual(expectedPending, actualPending) {
t.Errorf("Unexpected pending: %s", cmp.Diff(expectedPending, actualPending))
}
if tc.expectPending != nil {
if !status.working {
@ -2118,6 +2151,7 @@ func (kl *simpleFakeKubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod,
// TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
// for their invocation of the syncPodFn.
func TestFakePodWorkers(t *testing.T) {
tCtx := ktesting.Init(t)
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &containertest.FakeRuntime{}
fakeCache := containertest.NewFakeCache(fakeRuntime)
@ -2162,12 +2196,12 @@ func TestFakePodWorkers(t *testing.T) {
for i, tt := range tests {
kubeletForRealWorkers.wg.Add(1)
realPodWorkers.UpdatePod(UpdatePodOptions{
realPodWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: tt.pod,
MirrorPod: tt.mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
})
fakePodWorkers.UpdatePod(UpdatePodOptions{
fakePodWorkers.UpdatePod(tCtx, UpdatePodOptions{
Pod: tt.pod,
MirrorPod: tt.mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
@ -2192,8 +2226,9 @@ func TestFakePodWorkers(t *testing.T) {
// TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
func TestKillPodNowFunc(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
podWorkers, _, processed := createPodWorkers()
killPodFunc := killPodNow(podWorkers, fakeRecorder)
logger, tCtx := ktesting.NewTestContext(t)
podWorkers, _, processed := createPodWorkers(logger)
killPodFunc := killPodNow(tCtx, podWorkers, fakeRecorder)
pod := newNamedPod("test", "ns", "test", false)
gracePeriodOverride := int64(0)
err := killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
@ -2524,7 +2559,8 @@ func Test_allowPodStart(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
podWorkers, _, _ := createPodWorkers()
logger, _ := ktesting.NewTestContext(t)
podWorkers, _, _ := createPodWorkers(logger)
if tc.podSyncStatuses != nil {
podWorkers.podSyncStatuses = tc.podSyncStatuses
}
@ -2534,7 +2570,7 @@ func Test_allowPodStart(t *testing.T) {
if tc.waitingToStartStaticPodsByFullname != nil {
podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
}
allowed, allowedEver := podWorkers.allowPodStart(tc.pod)
allowed, allowedEver := podWorkers.allowPodStart(logger, tc.pod)
if allowed != tc.allowed {
if tc.allowed {
t.Errorf("Pod should be allowed")