From e649292782f19285ad031c7b5472d76e1bcd903b Mon Sep 17 00:00:00 2001 From: hoteye Date: Thu, 22 Jan 2026 09:06:35 +0800 Subject: [PATCH] kubelet: migrate kubelet_getters.go to contextual logging. Migrate GetNode and related functions to use contextual logging --- pkg/kubelet/cm/container_manager.go | 2 +- pkg/kubelet/cm/dra/manager.go | 2 +- pkg/kubelet/cm/dra/manager_test.go | 2 +- .../cm/dra/plugin/dra_plugin_manager.go | 6 ++-- .../cm/dra/plugin/registration_test.go | 3 +- pkg/kubelet/kubelet.go | 8 ++--- pkg/kubelet/kubelet_getters.go | 19 ++++++----- pkg/kubelet/kubelet_getters_test.go | 4 +-- pkg/kubelet/kubelet_node_status.go | 2 +- pkg/kubelet/kubelet_nodecache.go | 2 +- pkg/kubelet/kubelet_pods.go | 17 +++++----- pkg/kubelet/kubelet_pods_test.go | 8 ++--- pkg/kubelet/kubelet_resources.go | 5 +-- pkg/kubelet/kubelet_resources_test.go | 4 ++- pkg/kubelet/server/server_test.go | 2 +- pkg/kubelet/server/stats/handler.go | 2 +- pkg/kubelet/server/stats/summary.go | 4 +-- pkg/kubelet/server/stats/summary_test.go | 6 ++-- .../server/stats/summary_windows_test.go | 2 +- pkg/kubelet/server/stats/testing/mocks.go | 33 +++++++++++-------- .../util/manager/cache_based_manager.go | 7 ++-- .../util/manager/cache_based_manager_test.go | 2 +- pkg/kubelet/volume_host.go | 9 +++-- 23 files changed, 85 insertions(+), 66 deletions(-) diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 7a2c002eba6..0e4357f7fb7 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -60,7 +60,7 @@ const ( type ActivePodsFunc func() []*v1.Pod -type GetNodeFunc func() (*v1.Node, error) +type GetNodeFunc func(context.Context) (*v1.Node, error) // Manages the containers running on a machine. type ContainerManager interface { diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index f316e37db12..e9b477a46b0 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -74,7 +74,7 @@ const defaultWipingDelay = 30 * time.Second type ActivePodsFunc func() []*v1.Pod // GetNodeFunc is a function that returns the node object using the kubelet's node lister. -type GetNodeFunc func() (*v1.Node, error) +type GetNodeFunc func(context.Context) (*v1.Node, error) // Manager is responsible for managing ResourceClaims. // It ensures that they are prepared before starting pods diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 58fe6ab5337..c2cf665ad68 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -707,7 +707,7 @@ func TestGetResources(t *testing.T) { } } -func getFakeNode() (*v1.Node, error) { +func getFakeNode(context.Context) (*v1.Node, error) { return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil } diff --git a/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go b/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go index fd08372c93b..85a0609b6a1 100644 --- a/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go +++ b/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go @@ -58,7 +58,7 @@ type DRAPluginManager struct { backgroundCtx context.Context cancel func(err error) kubeClient kubernetes.Interface - getNode func() (*v1.Node, error) + getNode func(context.Context) (*v1.Node, error) wipingDelay time.Duration streamHandler StreamHandler @@ -146,7 +146,7 @@ func (m *monitoredPlugin) HandleConn(_ context.Context, stats grpcstats.ConnStat // The context can be used to cancel all background activities. // If desired, Stop can be called in addition or instead of canceling // the context. It then also waits for background activities to stop. -func NewDRAPluginManager(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), streamHandler StreamHandler, wipingDelay time.Duration) *DRAPluginManager { +func NewDRAPluginManager(ctx context.Context, kubeClient kubernetes.Interface, getNode func(context.Context) (*v1.Node, error), streamHandler StreamHandler, wipingDelay time.Duration) *DRAPluginManager { ctx, cancel := context.WithCancelCause(ctx) pm := &DRAPluginManager{ backgroundCtx: klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "DRA registration handler")), @@ -228,7 +228,7 @@ func (pm *DRAPluginManager) wipeResourceSlices(ctx context.Context, driver strin // Error logging is done inside the loop. Context cancellation doesn't get logged. _ = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - node, err := pm.getNode() + node, err := pm.getNode(ctx) if apierrors.IsNotFound(err) { return false, nil } diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index acf674162ef..57c9b2ed8fa 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -17,6 +17,7 @@ limitations under the License. package plugin import ( + "context" "path" "sort" "strings" @@ -47,7 +48,7 @@ const ( pluginB = "pluginB" ) -func getFakeNode() (*v1.Node, error) { +func getFakeNode(context.Context) (*v1.Node, error) { return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1924d91e86e..8eea28106af 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -901,7 +901,7 @@ func NewMainKubelet(ctx context.Context, if kubeDeps.TLSOptions != nil { if kubeCfg.ServerTLSBootstrap && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) { klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, func() []v1.NodeAddress { - return klet.getLastObservedNodeAddresses(logger) + return klet.getLastObservedNodeAddresses(ctx) }, certDirectory) if err != nil { return nil, fmt.Errorf("failed to initialize certificate manager: %w", err) @@ -1742,7 +1742,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) { // ignore any errors, since if stats collection is not successful, the container manager will fail to start below. kl.StatsProvider.GetCgroupStats("/", true) // Start container manager. - node, err := kl.getNodeAnyWay() + node, err := kl.getNodeAnyWay(ctx) if err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. klog.ErrorS(err, "Kubelet failed to get node info") @@ -3338,7 +3338,7 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror } } if mirrorPod == nil || deleted { - node, err := kl.GetNode() + node, err := kl.GetNode(ctx) if err != nil { klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) } else if node.DeletionTimestamp != nil { @@ -3355,7 +3355,7 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror // Ensure Mirror Pod for Static Pod exists as soon as node is registered. func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { - _, err := kl.GetNode() + _, err := kl.GetNode(ctx) if err == nil { return true, nil } diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index 70554880d81..85598f22896 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -286,9 +286,9 @@ func (kl *Kubelet) getRuntime() kubecontainer.Runtime { } // GetNode returns the node info for the configured node name of this Kubelet. -func (kl *Kubelet) GetNode() (*v1.Node, error) { +func (kl *Kubelet) GetNode(ctx context.Context) (*v1.Node, error) { if kl.kubeClient == nil { - return kl.initialNode(context.TODO()) + return kl.initialNode(ctx) } return kl.nodeLister.Get(string(kl.nodeName)) } @@ -298,11 +298,11 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { // Return kubelet's nodeInfo for this node, except on error or if in standalone mode, // in which case return a manufactured nodeInfo representing a node with no pods, // zero capacity, and the default labels. -func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) { - if n, err := kl.GetNode(); err == nil { +func (kl *Kubelet) getNodeAnyWay(ctx context.Context) (*v1.Node, error) { + if n, err := kl.GetNode(ctx); err == nil { return n, nil } - return kl.initialNode(context.TODO()) + return kl.initialNode(ctx) } // GetNodeConfig returns the container manager node config. @@ -317,8 +317,8 @@ func (kl *Kubelet) GetPodCgroupRoot() string { // getHostIPsAnyWay attempts to return the host IPs from kubelet's nodeInfo, or // the initialNode. -func (kl *Kubelet) getHostIPsAnyWay() ([]net.IP, error) { - node, err := kl.getNodeAnyWay() +func (kl *Kubelet) getHostIPsAnyWay(ctx context.Context) ([]net.IP, error) { + node, err := kl.getNodeAnyWay(ctx) if err != nil { return nil, err } @@ -467,8 +467,9 @@ func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) { } // getLastStableNodeAddresses returns the last observed node addresses. -func (kl *Kubelet) getLastObservedNodeAddresses(logger klog.Logger) []v1.NodeAddress { - node, err := kl.GetNode() +func (kl *Kubelet) getLastObservedNodeAddresses(ctx context.Context) []v1.NodeAddress { + logger := klog.FromContext(ctx) + node, err := kl.GetNode(ctx) if err != nil || node == nil { logger.V(4).Info("fail to obtain node from local cache", "node", kl.nodeName, "error", err) return nil diff --git a/pkg/kubelet/kubelet_getters_test.go b/pkg/kubelet/kubelet_getters_test.go index d9710ace39f..6b098acc87e 100644 --- a/pkg/kubelet/kubelet_getters_test.go +++ b/pkg/kubelet/kubelet_getters_test.go @@ -286,7 +286,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + _, ctx := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -297,7 +297,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) { nodeLister.nodes = append(nodeLister.nodes, tc.node) } kl.nodeLister = nodeLister - addrs := kl.getLastObservedNodeAddresses(logger) + addrs := kl.getLastObservedNodeAddresses(ctx) if len(addrs) != len(tc.expectedAddrs) { t.Errorf("expected %d addresses, got %d", len(tc.expectedAddrs), len(addrs)) diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 669cdf25bef..97314f64223 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -388,7 +388,7 @@ func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (comp return true } - originalNode, err := kl.GetNode() + originalNode, err := kl.GetNode(ctx) if err != nil { logger.Error(err, "Error getting the current node from lister") return false diff --git a/pkg/kubelet/kubelet_nodecache.go b/pkg/kubelet/kubelet_nodecache.go index 5491d944c8d..0d0ac097c53 100644 --- a/pkg/kubelet/kubelet_nodecache.go +++ b/pkg/kubelet/kubelet_nodecache.go @@ -38,7 +38,7 @@ func (kl *Kubelet) GetCachedNode(ctx context.Context, useCache bool) (*v1.Node, // and returns the newer one. func (kl *Kubelet) getCachedNode(ctx context.Context) (*v1.Node, error) { logger := klog.FromContext(ctx) - informerNode, err := kl.GetNode() + informerNode, err := kl.GetNode(ctx) if err != nil { if kl.cachedNode != nil { logger.Error(err, "failed to list node; using cached node") diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 83b8f696e1d..24dc690f1d8 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -649,7 +649,7 @@ func (kl *Kubelet) GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, } opts.Devices = append(opts.Devices, blkVolumes...) - envs, err := kl.makeEnvironmentVariables(logger, pod, container, podIP, podIPs, volumes) + envs, err := kl.makeEnvironmentVariables(ctx, pod, container, podIP, podIPs, volumes) if err != nil { return nil, nil, err } @@ -731,7 +731,8 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string, enableServiceLinks bool) (map[ } // Make the environment variables for a pod in the given namespace. -func (kl *Kubelet) makeEnvironmentVariables(logger klog.Logger, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.EnvVar, error) { +func (kl *Kubelet) makeEnvironmentVariables(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.EnvVar, error) { + logger := klog.FromContext(ctx) if pod.Spec.EnableServiceLinks == nil { return nil, fmt.Errorf("nil pod.spec.enableServiceLinks encountered, cannot construct envvars") } @@ -851,12 +852,12 @@ func (kl *Kubelet) makeEnvironmentVariables(logger klog.Logger, pod *v1.Pod, con // Step 1b: resolve alternate env var sources switch { case envVar.ValueFrom.FieldRef != nil: - runtimeVal, err = kl.podFieldSelectorRuntimeValue(envVar.ValueFrom.FieldRef, pod, podIP, podIPs) + runtimeVal, err = kl.podFieldSelectorRuntimeValue(ctx, envVar.ValueFrom.FieldRef, pod, podIP, podIPs) if err != nil { return result, err } case envVar.ValueFrom.ResourceFieldRef != nil: - defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardAPI(pod, container) + defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardAPI(ctx, pod, container) if err != nil { return result, err } @@ -982,7 +983,7 @@ func (kl *Kubelet) makeEnvironmentVariables(logger klog.Logger, pod *v1.Pod, con // podFieldSelectorRuntimeValue returns the runtime value of the given // selector for a pod. -func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *v1.ObjectFieldSelector, pod *v1.Pod, podIP string, podIPs []string) (string, error) { +func (kl *Kubelet) podFieldSelectorRuntimeValue(ctx context.Context, fs *v1.ObjectFieldSelector, pod *v1.Pod, podIP string, podIPs []string) (string, error) { internalFieldPath, _, err := podshelper.ConvertDownwardAPIFieldLabel(fs.APIVersion, fs.FieldPath, "") if err != nil { return "", err @@ -1000,13 +1001,13 @@ func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *v1.ObjectFieldSelector, pod case "spec.serviceAccountName": return pod.Spec.ServiceAccountName, nil case "status.hostIP": - hostIPs, err := kl.getHostIPsAnyWay() + hostIPs, err := kl.getHostIPsAnyWay(ctx) if err != nil { return "", err } return hostIPs[0].String(), nil case "status.hostIPs": - hostIPs, err := kl.getHostIPsAnyWay() + hostIPs, err := kl.getHostIPsAnyWay(ctx) if err != nil { return "", err } @@ -1991,7 +1992,7 @@ func (kl *Kubelet) generateAPIPodStatus(ctx context.Context, pod *v1.Pod, podSta } // set HostIP/HostIPs and initialize PodIP/PodIPs for host network pods if kl.kubeClient != nil { - hostIPs, err := kl.getHostIPsAnyWay() + hostIPs, err := kl.getHostIPsAnyWay(ctx) if err != nil { logger.V(4).Info("Cannot get host IPs", "err", err) } else { diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 75844060793..9e4fb47f0b8 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -396,7 +396,7 @@ func buildService(name, namespace, clusterIP, protocol string, port int) *v1.Ser } func TestMakeEnvironmentVariables(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + tCtx := ktesting.Init(t) trueVal := true services := []*v1.Service{ buildService("kubernetes", metav1.NamespaceDefault, "1.2.3.1", "TCP", 8081), @@ -2036,7 +2036,7 @@ func TestMakeEnvironmentVariables(t *testing.T) { testPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" } - result, err := kl.makeEnvironmentVariables(logger, testPod, tc.container, podIP, tc.podIPs, kubecontainer.VolumeMap{}) + result, err := kl.makeEnvironmentVariables(tCtx, testPod, tc.container, podIP, tc.podIPs, kubecontainer.VolumeMap{}) select { case e := <-fakeRecorder.Events: assert.Equal(t, tc.expectedEvent, e) @@ -7770,7 +7770,7 @@ func (tvm *testVolumeMounter) GetPath() string { } func TestMakeEnvironmentVariablesWithFileKeyRef(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + tCtx := ktesting.Init(t) // Create a temporary directory for test files tmpDir, err := os.MkdirTemp("", "filekeyref-test") require.NoError(t, err) @@ -8126,7 +8126,7 @@ func TestMakeEnvironmentVariablesWithFileKeyRef(t *testing.T) { }, } - envs, err := kl.makeEnvironmentVariables(logger, pod, tc.container, "192.168.1.1", []string{"192.168.1.1"}, tc.podVolumes) + envs, err := kl.makeEnvironmentVariables(tCtx, pod, tc.container, "192.168.1.1", []string{"192.168.1.1"}, tc.podVolumes) if tc.expectedError { require.Error(t, err) diff --git a/pkg/kubelet/kubelet_resources.go b/pkg/kubelet/kubelet_resources.go index 0a7a1ad1052..462cbd99655 100644 --- a/pkg/kubelet/kubelet_resources.go +++ b/pkg/kubelet/kubelet_resources.go @@ -17,6 +17,7 @@ limitations under the License. package kubelet import ( + "context" "fmt" "k8s.io/klog/v2" @@ -35,12 +36,12 @@ import ( // If a container has no limits specified, it defaults to the pod-level resources. // If neither container-level nor pod-level resources limits are specified, it defaults // to the node's allocatable resources. -func (kl *Kubelet) defaultPodLimitsForDownwardAPI(pod *corev1.Pod, container *corev1.Container) (*corev1.Pod, *corev1.Container, error) { +func (kl *Kubelet) defaultPodLimitsForDownwardAPI(ctx context.Context, pod *corev1.Pod, container *corev1.Container) (*corev1.Pod, *corev1.Container, error) { if pod == nil { return nil, nil, fmt.Errorf("invalid input, pod cannot be nil") } - node, err := kl.getNodeAnyWay() + node, err := kl.getNodeAnyWay(ctx) if err != nil { return nil, nil, fmt.Errorf("failed to find node object, expected a node") } diff --git a/pkg/kubelet/kubelet_resources_test.go b/pkg/kubelet/kubelet_resources_test.go index a39bd4d0784..11c69f79390 100644 --- a/pkg/kubelet/kubelet_resources_test.go +++ b/pkg/kubelet/kubelet_resources_test.go @@ -28,9 +28,11 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" kubefeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestPodResourceLimitsDefaulting(t *testing.T) { + tCtx := ktesting.Init(t) tk := newTestKubelet(t, true) defer tk.Cleanup() tk.kubelet.nodeLister = &testNodeLister{ @@ -98,7 +100,7 @@ func TestPodResourceLimitsDefaulting(t *testing.T) { as := assert.New(t) for idx, tc := range cases { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodLevelResources, tc.podLevelResourcesEnabled) - actual, _, err := tk.kubelet.defaultPodLimitsForDownwardAPI(tc.pod, nil) + actual, _, err := tk.kubelet.defaultPodLimitsForDownwardAPI(tCtx, tc.pod, nil) as.NoError(err, "failed to default pod limits: %v", err) if !apiequality.Semantic.DeepEqual(tc.expected, actual) { as.Fail("test case [%d] failed. Expected: %+v, Got: %+v", idx, tc.expected, actual) diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 97091722ef5..33f981baa76 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -271,7 +271,7 @@ func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace } // Unused functions -func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil } +func (*fakeKubelet) GetNode(context.Context) (*v1.Node, error) { return nil, nil } func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} } func (*fakeKubelet) GetPodCgroupRoot() string { return "" } func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false } diff --git a/pkg/kubelet/server/stats/handler.go b/pkg/kubelet/server/stats/handler.go index 61b7576c9f8..1ac76920c9e 100644 --- a/pkg/kubelet/server/stats/handler.go +++ b/pkg/kubelet/server/stats/handler.go @@ -79,7 +79,7 @@ type Provider interface { // namespace. GetPodByName(namespace, name string) (*v1.Pod, bool) // GetNode returns the spec of the local node. - GetNode() (*v1.Node, error) + GetNode(context.Context) (*v1.Node, error) // GetNodeConfig returns the configuration of the local node. GetNodeConfig() cm.NodeConfig // ListVolumesForPod returns the stats of the volume used by the pod with diff --git a/pkg/kubelet/server/stats/summary.go b/pkg/kubelet/server/stats/summary.go index d371b7042c3..35b95754fee 100644 --- a/pkg/kubelet/server/stats/summary.go +++ b/pkg/kubelet/server/stats/summary.go @@ -72,7 +72,7 @@ func NewSummaryProvider(ctx context.Context, statsProvider Provider) SummaryProv func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) { // TODO(timstclair): Consider returning a best-effort response if any of // the following errors occur. - node, err := sp.provider.GetNode() + node, err := sp.provider.GetNode(ctx) if err != nil { return nil, fmt.Errorf("failed to get node info: %v", err) } @@ -129,7 +129,7 @@ func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*stat func (sp *summaryProviderImpl) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) { // TODO(timstclair): Consider returning a best-effort response if any of // the following errors occur. - node, err := sp.provider.GetNode() + node, err := sp.provider.GetNode(ctx) if err != nil { return nil, fmt.Errorf("failed to get node info: %v", err) } diff --git a/pkg/kubelet/server/stats/summary_test.go b/pkg/kubelet/server/stats/summary_test.go index 54dfecbe565..30aa1b8a68e 100644 --- a/pkg/kubelet/server/stats/summary_test.go +++ b/pkg/kubelet/server/stats/summary_test.go @@ -76,7 +76,7 @@ func TestSummaryProviderGetStatsNoSplitFileSystem(t *testing.T) { mockStatsProvider := statstest.NewMockProvider(t) - mockStatsProvider.EXPECT().GetNode().Return(node, nil) + mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil) mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig) mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot) mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe() @@ -177,7 +177,7 @@ func TestSummaryProviderGetStatsSplitImageFs(t *testing.T) { mockStatsProvider := statstest.NewMockProvider(t) - mockStatsProvider.EXPECT().GetNode().Return(node, nil) + mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil) mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig) mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot) mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe() @@ -277,7 +277,7 @@ func TestSummaryProviderGetCPUAndMemoryStats(t *testing.T) { mockStatsProvider := statstest.NewMockProvider(t) - mockStatsProvider.EXPECT().GetNode().Return(node, nil) + mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil) mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig) mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot) mockStatsProvider.EXPECT().ListPodCPUAndMemoryStats(ctx).Return(podStats, nil) diff --git a/pkg/kubelet/server/stats/summary_windows_test.go b/pkg/kubelet/server/stats/summary_windows_test.go index 3fcc5291b29..a31953d7e52 100644 --- a/pkg/kubelet/server/stats/summary_windows_test.go +++ b/pkg/kubelet/server/stats/summary_windows_test.go @@ -54,7 +54,7 @@ func TestSummaryProvider(t *testing.T) { assert := assert.New(t) mockStatsProvider := statstest.NewMockProvider(t) - mockStatsProvider.EXPECT().GetNode().Return(node, nil).Maybe() + mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil).Maybe() mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig).Maybe() mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot).Maybe() mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe() diff --git a/pkg/kubelet/server/stats/testing/mocks.go b/pkg/kubelet/server/stats/testing/mocks.go index f1742d3ca6d..de7a0608b8a 100644 --- a/pkg/kubelet/server/stats/testing/mocks.go +++ b/pkg/kubelet/server/stats/testing/mocks.go @@ -205,8 +205,8 @@ func (_c *MockProvider_GetCgroupStats_Call) RunAndReturn(run func(cgroupName str } // GetNode provides a mock function for the type MockProvider -func (_mock *MockProvider) GetNode() (*v1.Node, error) { - ret := _mock.Called() +func (_mock *MockProvider) GetNode(context1 context.Context) (*v1.Node, error) { + ret := _mock.Called(context1) if len(ret) == 0 { panic("no return value specified for GetNode") @@ -214,18 +214,18 @@ func (_mock *MockProvider) GetNode() (*v1.Node, error) { var r0 *v1.Node var r1 error - if returnFunc, ok := ret.Get(0).(func() (*v1.Node, error)); ok { - return returnFunc() + if returnFunc, ok := ret.Get(0).(func(context.Context) (*v1.Node, error)); ok { + return returnFunc(context1) } - if returnFunc, ok := ret.Get(0).(func() *v1.Node); ok { - r0 = returnFunc() + if returnFunc, ok := ret.Get(0).(func(context.Context) *v1.Node); ok { + r0 = returnFunc(context1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*v1.Node) } } - if returnFunc, ok := ret.Get(1).(func() error); ok { - r1 = returnFunc() + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(context1) } else { r1 = ret.Error(1) } @@ -238,13 +238,20 @@ type MockProvider_GetNode_Call struct { } // GetNode is a helper method to define mock.On call -func (_e *MockProvider_Expecter) GetNode() *MockProvider_GetNode_Call { - return &MockProvider_GetNode_Call{Call: _e.mock.On("GetNode")} +// - context1 context.Context +func (_e *MockProvider_Expecter) GetNode(context1 interface{}) *MockProvider_GetNode_Call { + return &MockProvider_GetNode_Call{Call: _e.mock.On("GetNode", context1)} } -func (_c *MockProvider_GetNode_Call) Run(run func()) *MockProvider_GetNode_Call { +func (_c *MockProvider_GetNode_Call) Run(run func(context1 context.Context)) *MockProvider_GetNode_Call { _c.Call.Run(func(args mock.Arguments) { - run() + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) }) return _c } @@ -254,7 +261,7 @@ func (_c *MockProvider_GetNode_Call) Return(node *v1.Node, err error) *MockProvi return _c } -func (_c *MockProvider_GetNode_Call) RunAndReturn(run func() (*v1.Node, error)) *MockProvider_GetNode_Call { +func (_c *MockProvider_GetNode_Call) RunAndReturn(run func(context1 context.Context) (*v1.Node, error)) *MockProvider_GetNode_Call { _c.Call.Return(run) return _c } diff --git a/pkg/kubelet/util/manager/cache_based_manager.go b/pkg/kubelet/util/manager/cache_based_manager.go index 5f478cf3f57..65595cb52d6 100644 --- a/pkg/kubelet/util/manager/cache_based_manager.go +++ b/pkg/kubelet/util/manager/cache_based_manager.go @@ -17,6 +17,7 @@ limitations under the License. package manager import ( + "context" "fmt" "strconv" "sync" @@ -129,9 +130,11 @@ func (s *objectStore) DeleteReference(namespace, name string, _ types.UID) { // GetObjectTTLFromNodeFunc returns a function that returns TTL value // from a given Node object. -func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { +func GetObjectTTLFromNodeFunc(getNode func(context.Context) (*v1.Node, error)) GetObjectTTLFunc { return func() (time.Duration, bool) { - node, err := getNode() + // TODO: Pass context from upper level instead of using context.Background(). + // This requires changing the GetObjectTTLFunc signature to accept a context parameter. + node, err := getNode(context.Background()) if err != nil { return time.Duration(0), false } diff --git a/pkg/kubelet/util/manager/cache_based_manager_test.go b/pkg/kubelet/util/manager/cache_based_manager_test.go index 38e174651a6..d386033fb08 100644 --- a/pkg/kubelet/util/manager/cache_based_manager_test.go +++ b/pkg/kubelet/util/manager/cache_based_manager_test.go @@ -319,7 +319,7 @@ func TestParseNodeAnnotation(t *testing.T) { }, } for i, testCase := range testCases { - getNode := func() (*v1.Node, error) { return testCase.node, testCase.err } + getNode := func(context.Context) (*v1.Node, error) { return testCase.node, testCase.err } ttl, exists := GetObjectTTLFromNodeFunc(getNode)() if exists != testCase.exists { t.Errorf("%d: incorrect parsing: %t", i, exists) diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 16bafb37735..5dd56947777 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -220,7 +220,8 @@ func (kvh *kubeletVolumeHost) GetMounter() mount.Interface { } func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) { - node, err := kvh.kubelet.getNodeAnyWay() + // TODO: Pass proper context when VolumeHost interface methods support context parameters + node, err := kvh.kubelet.getNodeAnyWay(context.TODO()) if err != nil { return nil, fmt.Errorf("error retrieving node: %v", err) } @@ -266,7 +267,8 @@ func (kvh *kubeletVolumeHost) GetPodCertificateCredentialBundle(ctx context.Cont } func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { - node, err := kvh.kubelet.GetNode() + // TODO: Pass proper context when VolumeHost interface methods support context parameters + node, err := kvh.kubelet.GetNode(context.TODO()) if err != nil { return nil, fmt.Errorf("error retrieving node: %v", err) } @@ -274,7 +276,8 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { } func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { - node, err := kvh.kubelet.GetNode() + // TODO: Pass proper context when VolumeHost interface methods support context parameters + node, err := kvh.kubelet.GetNode(context.TODO()) if err != nil { return nil, fmt.Errorf("error retrieving node: %v", err) }