mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-22 18:08:54 -04:00
Merge pull request #136413 from hoteye/migrate-kubelet-getnode-context
kubelet: migrate kubelet_getters.go to contextual logging
This commit is contained in:
commit
62cbba593b
23 changed files with 85 additions and 66 deletions
|
|
@ -60,7 +60,7 @@ const (
|
|||
|
||||
type ActivePodsFunc func() []*v1.Pod
|
||||
|
||||
type GetNodeFunc func() (*v1.Node, error)
|
||||
type GetNodeFunc func(context.Context) (*v1.Node, error)
|
||||
|
||||
// Manages the containers running on a machine.
|
||||
type ContainerManager interface {
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ const defaultWipingDelay = 30 * time.Second
|
|||
type ActivePodsFunc func() []*v1.Pod
|
||||
|
||||
// GetNodeFunc is a function that returns the node object using the kubelet's node lister.
|
||||
type GetNodeFunc func() (*v1.Node, error)
|
||||
type GetNodeFunc func(context.Context) (*v1.Node, error)
|
||||
|
||||
// Manager is responsible for managing ResourceClaims.
|
||||
// It ensures that they are prepared before starting pods
|
||||
|
|
|
|||
|
|
@ -707,7 +707,7 @@ func TestGetResources(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func getFakeNode() (*v1.Node, error) {
|
||||
func getFakeNode(context.Context) (*v1.Node, error) {
|
||||
return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ type DRAPluginManager struct {
|
|||
backgroundCtx context.Context
|
||||
cancel func(err error)
|
||||
kubeClient kubernetes.Interface
|
||||
getNode func() (*v1.Node, error)
|
||||
getNode func(context.Context) (*v1.Node, error)
|
||||
wipingDelay time.Duration
|
||||
streamHandler StreamHandler
|
||||
|
||||
|
|
@ -146,7 +146,7 @@ func (m *monitoredPlugin) HandleConn(_ context.Context, stats grpcstats.ConnStat
|
|||
// The context can be used to cancel all background activities.
|
||||
// If desired, Stop can be called in addition or instead of canceling
|
||||
// the context. It then also waits for background activities to stop.
|
||||
func NewDRAPluginManager(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), streamHandler StreamHandler, wipingDelay time.Duration) *DRAPluginManager {
|
||||
func NewDRAPluginManager(ctx context.Context, kubeClient kubernetes.Interface, getNode func(context.Context) (*v1.Node, error), streamHandler StreamHandler, wipingDelay time.Duration) *DRAPluginManager {
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
pm := &DRAPluginManager{
|
||||
backgroundCtx: klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "DRA registration handler")),
|
||||
|
|
@ -228,7 +228,7 @@ func (pm *DRAPluginManager) wipeResourceSlices(ctx context.Context, driver strin
|
|||
|
||||
// Error logging is done inside the loop. Context cancellation doesn't get logged.
|
||||
_ = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
|
||||
node, err := pm.getNode()
|
||||
node, err := pm.getNode(ctx)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
|
|
@ -47,7 +48,7 @@ const (
|
|||
pluginB = "pluginB"
|
||||
)
|
||||
|
||||
func getFakeNode() (*v1.Node, error) {
|
||||
func getFakeNode(context.Context) (*v1.Node, error) {
|
||||
return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -901,7 +901,7 @@ func NewMainKubelet(ctx context.Context,
|
|||
if kubeDeps.TLSOptions != nil {
|
||||
if kubeCfg.ServerTLSBootstrap && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
|
||||
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, func() []v1.NodeAddress {
|
||||
return klet.getLastObservedNodeAddresses(logger)
|
||||
return klet.getLastObservedNodeAddresses(ctx)
|
||||
}, certDirectory)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize certificate manager: %w", err)
|
||||
|
|
@ -1742,7 +1742,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) {
|
|||
// ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
|
||||
kl.StatsProvider.GetCgroupStats("/", true)
|
||||
// Start container manager.
|
||||
node, err := kl.getNodeAnyWay()
|
||||
node, err := kl.getNodeAnyWay(ctx)
|
||||
if err != nil {
|
||||
// Fail kubelet and rely on the babysitter to retry starting kubelet.
|
||||
klog.ErrorS(err, "Kubelet failed to get node info")
|
||||
|
|
@ -3338,7 +3338,7 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror
|
|||
}
|
||||
}
|
||||
if mirrorPod == nil || deleted {
|
||||
node, err := kl.GetNode()
|
||||
node, err := kl.GetNode(ctx)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
|
||||
} else if node.DeletionTimestamp != nil {
|
||||
|
|
@ -3355,7 +3355,7 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror
|
|||
// Ensure Mirror Pod for Static Pod exists as soon as node is registered.
|
||||
func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
|
||||
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||
_, err := kl.GetNode()
|
||||
_, err := kl.GetNode(ctx)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -286,9 +286,9 @@ func (kl *Kubelet) getRuntime() kubecontainer.Runtime {
|
|||
}
|
||||
|
||||
// GetNode returns the node info for the configured node name of this Kubelet.
|
||||
func (kl *Kubelet) GetNode() (*v1.Node, error) {
|
||||
func (kl *Kubelet) GetNode(ctx context.Context) (*v1.Node, error) {
|
||||
if kl.kubeClient == nil {
|
||||
return kl.initialNode(context.TODO())
|
||||
return kl.initialNode(ctx)
|
||||
}
|
||||
return kl.nodeLister.Get(string(kl.nodeName))
|
||||
}
|
||||
|
|
@ -298,11 +298,11 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
|
|||
// Return kubelet's nodeInfo for this node, except on error or if in standalone mode,
|
||||
// in which case return a manufactured nodeInfo representing a node with no pods,
|
||||
// zero capacity, and the default labels.
|
||||
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
|
||||
if n, err := kl.GetNode(); err == nil {
|
||||
func (kl *Kubelet) getNodeAnyWay(ctx context.Context) (*v1.Node, error) {
|
||||
if n, err := kl.GetNode(ctx); err == nil {
|
||||
return n, nil
|
||||
}
|
||||
return kl.initialNode(context.TODO())
|
||||
return kl.initialNode(ctx)
|
||||
}
|
||||
|
||||
// GetNodeConfig returns the container manager node config.
|
||||
|
|
@ -317,8 +317,8 @@ func (kl *Kubelet) GetPodCgroupRoot() string {
|
|||
|
||||
// getHostIPsAnyWay attempts to return the host IPs from kubelet's nodeInfo, or
|
||||
// the initialNode.
|
||||
func (kl *Kubelet) getHostIPsAnyWay() ([]net.IP, error) {
|
||||
node, err := kl.getNodeAnyWay()
|
||||
func (kl *Kubelet) getHostIPsAnyWay(ctx context.Context) ([]net.IP, error) {
|
||||
node, err := kl.getNodeAnyWay(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -467,8 +467,9 @@ func (kl *Kubelet) setCachedMachineInfo(info *cadvisorapiv1.MachineInfo) {
|
|||
}
|
||||
|
||||
// getLastStableNodeAddresses returns the last observed node addresses.
|
||||
func (kl *Kubelet) getLastObservedNodeAddresses(logger klog.Logger) []v1.NodeAddress {
|
||||
node, err := kl.GetNode()
|
||||
func (kl *Kubelet) getLastObservedNodeAddresses(ctx context.Context) []v1.NodeAddress {
|
||||
logger := klog.FromContext(ctx)
|
||||
node, err := kl.GetNode(ctx)
|
||||
if err != nil || node == nil {
|
||||
logger.V(4).Info("fail to obtain node from local cache", "node", kl.nodeName, "error", err)
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -286,7 +286,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) {
|
|||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
kl := testKubelet.kubelet
|
||||
|
|
@ -297,7 +297,7 @@ func Test_getLastObservedNodeAddresses(t *testing.T) {
|
|||
nodeLister.nodes = append(nodeLister.nodes, tc.node)
|
||||
}
|
||||
kl.nodeLister = nodeLister
|
||||
addrs := kl.getLastObservedNodeAddresses(logger)
|
||||
addrs := kl.getLastObservedNodeAddresses(ctx)
|
||||
|
||||
if len(addrs) != len(tc.expectedAddrs) {
|
||||
t.Errorf("expected %d addresses, got %d", len(tc.expectedAddrs), len(addrs))
|
||||
|
|
|
|||
|
|
@ -388,7 +388,7 @@ func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (comp
|
|||
return true
|
||||
}
|
||||
|
||||
originalNode, err := kl.GetNode()
|
||||
originalNode, err := kl.GetNode(ctx)
|
||||
if err != nil {
|
||||
logger.Error(err, "Error getting the current node from lister")
|
||||
return false
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ func (kl *Kubelet) GetCachedNode(ctx context.Context, useCache bool) (*v1.Node,
|
|||
// and returns the newer one.
|
||||
func (kl *Kubelet) getCachedNode(ctx context.Context) (*v1.Node, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
informerNode, err := kl.GetNode()
|
||||
informerNode, err := kl.GetNode(ctx)
|
||||
if err != nil {
|
||||
if kl.cachedNode != nil {
|
||||
logger.Error(err, "failed to list node; using cached node")
|
||||
|
|
|
|||
|
|
@ -649,7 +649,7 @@ func (kl *Kubelet) GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod,
|
|||
}
|
||||
opts.Devices = append(opts.Devices, blkVolumes...)
|
||||
|
||||
envs, err := kl.makeEnvironmentVariables(logger, pod, container, podIP, podIPs, volumes)
|
||||
envs, err := kl.makeEnvironmentVariables(ctx, pod, container, podIP, podIPs, volumes)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -731,7 +731,8 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string, enableServiceLinks bool) (map[
|
|||
}
|
||||
|
||||
// Make the environment variables for a pod in the given namespace.
|
||||
func (kl *Kubelet) makeEnvironmentVariables(logger klog.Logger, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.EnvVar, error) {
|
||||
func (kl *Kubelet) makeEnvironmentVariables(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.EnvVar, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if pod.Spec.EnableServiceLinks == nil {
|
||||
return nil, fmt.Errorf("nil pod.spec.enableServiceLinks encountered, cannot construct envvars")
|
||||
}
|
||||
|
|
@ -851,12 +852,12 @@ func (kl *Kubelet) makeEnvironmentVariables(logger klog.Logger, pod *v1.Pod, con
|
|||
// Step 1b: resolve alternate env var sources
|
||||
switch {
|
||||
case envVar.ValueFrom.FieldRef != nil:
|
||||
runtimeVal, err = kl.podFieldSelectorRuntimeValue(envVar.ValueFrom.FieldRef, pod, podIP, podIPs)
|
||||
runtimeVal, err = kl.podFieldSelectorRuntimeValue(ctx, envVar.ValueFrom.FieldRef, pod, podIP, podIPs)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
case envVar.ValueFrom.ResourceFieldRef != nil:
|
||||
defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardAPI(pod, container)
|
||||
defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardAPI(ctx, pod, container)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
|
@ -982,7 +983,7 @@ func (kl *Kubelet) makeEnvironmentVariables(logger klog.Logger, pod *v1.Pod, con
|
|||
|
||||
// podFieldSelectorRuntimeValue returns the runtime value of the given
|
||||
// selector for a pod.
|
||||
func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *v1.ObjectFieldSelector, pod *v1.Pod, podIP string, podIPs []string) (string, error) {
|
||||
func (kl *Kubelet) podFieldSelectorRuntimeValue(ctx context.Context, fs *v1.ObjectFieldSelector, pod *v1.Pod, podIP string, podIPs []string) (string, error) {
|
||||
internalFieldPath, _, err := podshelper.ConvertDownwardAPIFieldLabel(fs.APIVersion, fs.FieldPath, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
|
@ -1000,13 +1001,13 @@ func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *v1.ObjectFieldSelector, pod
|
|||
case "spec.serviceAccountName":
|
||||
return pod.Spec.ServiceAccountName, nil
|
||||
case "status.hostIP":
|
||||
hostIPs, err := kl.getHostIPsAnyWay()
|
||||
hostIPs, err := kl.getHostIPsAnyWay(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hostIPs[0].String(), nil
|
||||
case "status.hostIPs":
|
||||
hostIPs, err := kl.getHostIPsAnyWay()
|
||||
hostIPs, err := kl.getHostIPsAnyWay(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
@ -1991,7 +1992,7 @@ func (kl *Kubelet) generateAPIPodStatus(ctx context.Context, pod *v1.Pod, podSta
|
|||
}
|
||||
// set HostIP/HostIPs and initialize PodIP/PodIPs for host network pods
|
||||
if kl.kubeClient != nil {
|
||||
hostIPs, err := kl.getHostIPsAnyWay()
|
||||
hostIPs, err := kl.getHostIPsAnyWay(ctx)
|
||||
if err != nil {
|
||||
logger.V(4).Info("Cannot get host IPs", "err", err)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -396,7 +396,7 @@ func buildService(name, namespace, clusterIP, protocol string, port int) *v1.Ser
|
|||
}
|
||||
|
||||
func TestMakeEnvironmentVariables(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
tCtx := ktesting.Init(t)
|
||||
trueVal := true
|
||||
services := []*v1.Service{
|
||||
buildService("kubernetes", metav1.NamespaceDefault, "1.2.3.1", "TCP", 8081),
|
||||
|
|
@ -2036,7 +2036,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
|
|||
testPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
|
||||
}
|
||||
|
||||
result, err := kl.makeEnvironmentVariables(logger, testPod, tc.container, podIP, tc.podIPs, kubecontainer.VolumeMap{})
|
||||
result, err := kl.makeEnvironmentVariables(tCtx, testPod, tc.container, podIP, tc.podIPs, kubecontainer.VolumeMap{})
|
||||
select {
|
||||
case e := <-fakeRecorder.Events:
|
||||
assert.Equal(t, tc.expectedEvent, e)
|
||||
|
|
@ -7770,7 +7770,7 @@ func (tvm *testVolumeMounter) GetPath() string {
|
|||
}
|
||||
|
||||
func TestMakeEnvironmentVariablesWithFileKeyRef(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
tCtx := ktesting.Init(t)
|
||||
// Create a temporary directory for test files
|
||||
tmpDir, err := os.MkdirTemp("", "filekeyref-test")
|
||||
require.NoError(t, err)
|
||||
|
|
@ -8126,7 +8126,7 @@ func TestMakeEnvironmentVariablesWithFileKeyRef(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
envs, err := kl.makeEnvironmentVariables(logger, pod, tc.container, "192.168.1.1", []string{"192.168.1.1"}, tc.podVolumes)
|
||||
envs, err := kl.makeEnvironmentVariables(tCtx, pod, tc.container, "192.168.1.1", []string{"192.168.1.1"}, tc.podVolumes)
|
||||
|
||||
if tc.expectedError {
|
||||
require.Error(t, err)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
|
@ -35,12 +36,12 @@ import (
|
|||
// If a container has no limits specified, it defaults to the pod-level resources.
|
||||
// If neither container-level nor pod-level resources limits are specified, it defaults
|
||||
// to the node's allocatable resources.
|
||||
func (kl *Kubelet) defaultPodLimitsForDownwardAPI(pod *corev1.Pod, container *corev1.Container) (*corev1.Pod, *corev1.Container, error) {
|
||||
func (kl *Kubelet) defaultPodLimitsForDownwardAPI(ctx context.Context, pod *corev1.Pod, container *corev1.Container) (*corev1.Pod, *corev1.Container, error) {
|
||||
if pod == nil {
|
||||
return nil, nil, fmt.Errorf("invalid input, pod cannot be nil")
|
||||
}
|
||||
|
||||
node, err := kl.getNodeAnyWay()
|
||||
node, err := kl.getNodeAnyWay(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to find node object, expected a node")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,9 +28,11 @@ import (
|
|||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
func TestPodResourceLimitsDefaulting(t *testing.T) {
|
||||
tCtx := ktesting.Init(t)
|
||||
tk := newTestKubelet(t, true)
|
||||
defer tk.Cleanup()
|
||||
tk.kubelet.nodeLister = &testNodeLister{
|
||||
|
|
@ -98,7 +100,7 @@ func TestPodResourceLimitsDefaulting(t *testing.T) {
|
|||
as := assert.New(t)
|
||||
for idx, tc := range cases {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodLevelResources, tc.podLevelResourcesEnabled)
|
||||
actual, _, err := tk.kubelet.defaultPodLimitsForDownwardAPI(tc.pod, nil)
|
||||
actual, _, err := tk.kubelet.defaultPodLimitsForDownwardAPI(tCtx, tc.pod, nil)
|
||||
as.NoError(err, "failed to default pod limits: %v", err)
|
||||
if !apiequality.Semantic.DeepEqual(tc.expected, actual) {
|
||||
as.Fail("test case [%d] failed. Expected: %+v, Got: %+v", idx, tc.expected, actual)
|
||||
|
|
|
|||
|
|
@ -271,7 +271,7 @@ func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace
|
|||
}
|
||||
|
||||
// Unused functions
|
||||
func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
|
||||
func (*fakeKubelet) GetNode(context.Context) (*v1.Node, error) { return nil, nil }
|
||||
func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
|
||||
func (*fakeKubelet) GetPodCgroupRoot() string { return "" }
|
||||
func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ type Provider interface {
|
|||
// namespace.
|
||||
GetPodByName(namespace, name string) (*v1.Pod, bool)
|
||||
// GetNode returns the spec of the local node.
|
||||
GetNode() (*v1.Node, error)
|
||||
GetNode(context.Context) (*v1.Node, error)
|
||||
// GetNodeConfig returns the configuration of the local node.
|
||||
GetNodeConfig() cm.NodeConfig
|
||||
// ListVolumesForPod returns the stats of the volume used by the pod with
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ func NewSummaryProvider(ctx context.Context, statsProvider Provider) SummaryProv
|
|||
func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) {
|
||||
// TODO(timstclair): Consider returning a best-effort response if any of
|
||||
// the following errors occur.
|
||||
node, err := sp.provider.GetNode()
|
||||
node, err := sp.provider.GetNode(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get node info: %v", err)
|
||||
}
|
||||
|
|
@ -129,7 +129,7 @@ func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*stat
|
|||
func (sp *summaryProviderImpl) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
|
||||
// TODO(timstclair): Consider returning a best-effort response if any of
|
||||
// the following errors occur.
|
||||
node, err := sp.provider.GetNode()
|
||||
node, err := sp.provider.GetNode(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get node info: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ func TestSummaryProviderGetStatsNoSplitFileSystem(t *testing.T) {
|
|||
|
||||
mockStatsProvider := statstest.NewMockProvider(t)
|
||||
|
||||
mockStatsProvider.EXPECT().GetNode().Return(node, nil)
|
||||
mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil)
|
||||
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig)
|
||||
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot)
|
||||
mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe()
|
||||
|
|
@ -177,7 +177,7 @@ func TestSummaryProviderGetStatsSplitImageFs(t *testing.T) {
|
|||
|
||||
mockStatsProvider := statstest.NewMockProvider(t)
|
||||
|
||||
mockStatsProvider.EXPECT().GetNode().Return(node, nil)
|
||||
mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil)
|
||||
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig)
|
||||
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot)
|
||||
mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe()
|
||||
|
|
@ -277,7 +277,7 @@ func TestSummaryProviderGetCPUAndMemoryStats(t *testing.T) {
|
|||
|
||||
mockStatsProvider := statstest.NewMockProvider(t)
|
||||
|
||||
mockStatsProvider.EXPECT().GetNode().Return(node, nil)
|
||||
mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil)
|
||||
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig)
|
||||
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot)
|
||||
mockStatsProvider.EXPECT().ListPodCPUAndMemoryStats(ctx).Return(podStats, nil)
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ func TestSummaryProvider(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
mockStatsProvider := statstest.NewMockProvider(t)
|
||||
mockStatsProvider.EXPECT().GetNode().Return(node, nil).Maybe()
|
||||
mockStatsProvider.EXPECT().GetNode(ctx).Return(node, nil).Maybe()
|
||||
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig).Maybe()
|
||||
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot).Maybe()
|
||||
mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe()
|
||||
|
|
|
|||
|
|
@ -205,8 +205,8 @@ func (_c *MockProvider_GetCgroupStats_Call) RunAndReturn(run func(cgroupName str
|
|||
}
|
||||
|
||||
// GetNode provides a mock function for the type MockProvider
|
||||
func (_mock *MockProvider) GetNode() (*v1.Node, error) {
|
||||
ret := _mock.Called()
|
||||
func (_mock *MockProvider) GetNode(context1 context.Context) (*v1.Node, error) {
|
||||
ret := _mock.Called(context1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for GetNode")
|
||||
|
|
@ -214,18 +214,18 @@ func (_mock *MockProvider) GetNode() (*v1.Node, error) {
|
|||
|
||||
var r0 *v1.Node
|
||||
var r1 error
|
||||
if returnFunc, ok := ret.Get(0).(func() (*v1.Node, error)); ok {
|
||||
return returnFunc()
|
||||
if returnFunc, ok := ret.Get(0).(func(context.Context) (*v1.Node, error)); ok {
|
||||
return returnFunc(context1)
|
||||
}
|
||||
if returnFunc, ok := ret.Get(0).(func() *v1.Node); ok {
|
||||
r0 = returnFunc()
|
||||
if returnFunc, ok := ret.Get(0).(func(context.Context) *v1.Node); ok {
|
||||
r0 = returnFunc(context1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*v1.Node)
|
||||
}
|
||||
}
|
||||
if returnFunc, ok := ret.Get(1).(func() error); ok {
|
||||
r1 = returnFunc()
|
||||
if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok {
|
||||
r1 = returnFunc(context1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
|
@ -238,13 +238,20 @@ type MockProvider_GetNode_Call struct {
|
|||
}
|
||||
|
||||
// GetNode is a helper method to define mock.On call
|
||||
func (_e *MockProvider_Expecter) GetNode() *MockProvider_GetNode_Call {
|
||||
return &MockProvider_GetNode_Call{Call: _e.mock.On("GetNode")}
|
||||
// - context1 context.Context
|
||||
func (_e *MockProvider_Expecter) GetNode(context1 interface{}) *MockProvider_GetNode_Call {
|
||||
return &MockProvider_GetNode_Call{Call: _e.mock.On("GetNode", context1)}
|
||||
}
|
||||
|
||||
func (_c *MockProvider_GetNode_Call) Run(run func()) *MockProvider_GetNode_Call {
|
||||
func (_c *MockProvider_GetNode_Call) Run(run func(context1 context.Context)) *MockProvider_GetNode_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
var arg0 context.Context
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(context.Context)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
|
@ -254,7 +261,7 @@ func (_c *MockProvider_GetNode_Call) Return(node *v1.Node, err error) *MockProvi
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProvider_GetNode_Call) RunAndReturn(run func() (*v1.Node, error)) *MockProvider_GetNode_Call {
|
||||
func (_c *MockProvider_GetNode_Call) RunAndReturn(run func(context1 context.Context) (*v1.Node, error)) *MockProvider_GetNode_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
|
@ -129,9 +130,11 @@ func (s *objectStore) DeleteReference(namespace, name string, _ types.UID) {
|
|||
|
||||
// GetObjectTTLFromNodeFunc returns a function that returns TTL value
|
||||
// from a given Node object.
|
||||
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
||||
func GetObjectTTLFromNodeFunc(getNode func(context.Context) (*v1.Node, error)) GetObjectTTLFunc {
|
||||
return func() (time.Duration, bool) {
|
||||
node, err := getNode()
|
||||
// TODO: Pass context from upper level instead of using context.Background().
|
||||
// This requires changing the GetObjectTTLFunc signature to accept a context parameter.
|
||||
node, err := getNode(context.Background())
|
||||
if err != nil {
|
||||
return time.Duration(0), false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -319,7 +319,7 @@ func TestParseNodeAnnotation(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
getNode := func() (*v1.Node, error) { return testCase.node, testCase.err }
|
||||
getNode := func(context.Context) (*v1.Node, error) { return testCase.node, testCase.err }
|
||||
ttl, exists := GetObjectTTLFromNodeFunc(getNode)()
|
||||
if exists != testCase.exists {
|
||||
t.Errorf("%d: incorrect parsing: %t", i, exists)
|
||||
|
|
|
|||
|
|
@ -220,7 +220,8 @@ func (kvh *kubeletVolumeHost) GetMounter() mount.Interface {
|
|||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
|
||||
node, err := kvh.kubelet.getNodeAnyWay()
|
||||
// TODO: Pass proper context when VolumeHost interface methods support context parameters
|
||||
node, err := kvh.kubelet.getNodeAnyWay(context.TODO())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving node: %v", err)
|
||||
}
|
||||
|
|
@ -266,7 +267,8 @@ func (kvh *kubeletVolumeHost) GetPodCertificateCredentialBundle(ctx context.Cont
|
|||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
|
||||
node, err := kvh.kubelet.GetNode()
|
||||
// TODO: Pass proper context when VolumeHost interface methods support context parameters
|
||||
node, err := kvh.kubelet.GetNode(context.TODO())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving node: %v", err)
|
||||
}
|
||||
|
|
@ -274,7 +276,8 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
|
|||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||
node, err := kvh.kubelet.GetNode()
|
||||
// TODO: Pass proper context when VolumeHost interface methods support context parameters
|
||||
node, err := kvh.kubelet.GetNode(context.TODO())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving node: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue