From 5d20dc55bfdbc48ec371be7b2decf41ea67f1ece Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Thu, 10 Jul 2025 22:15:24 +0700 Subject: [PATCH] chore(kubelet): migrate pluginmanager to contextual logging --- pkg/kubelet/cm/devicemanager/manager_test.go | 9 ++- pkg/kubelet/kubelet.go | 8 ++- .../cache/actual_state_of_world.go | 9 ++- .../cache/actual_state_of_world_test.go | 13 ++-- .../cache/desired_state_of_world.go | 9 ++- .../cache/desired_state_of_world_test.go | 15 +++-- .../pluginmanager/metrics/metrics_test.go | 6 +- .../operationexecutor/operation_executor.go | 14 ++-- .../operation_executor_test.go | 29 +++++---- .../operationexecutor/operation_generator.go | 37 +++++++---- pkg/kubelet/pluginmanager/plugin_manager.go | 17 +++-- .../pluginmanager/plugin_manager_test.go | 8 ++- .../pluginwatcher/example_handler.go | 27 ++++---- .../pluginwatcher/example_plugin.go | 23 ++++--- .../pluginwatcher/plugin_watcher.go | 64 +++++++++++-------- .../pluginwatcher/plugin_watcher_test.go | 27 +++++--- .../pluginmanager/reconciler/reconciler.go | 28 ++++---- .../reconciler/reconciler_test.go | 21 +++--- 18 files changed, 221 insertions(+), 143 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 4e882aa0057..3f4a1180744 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package devicemanager import ( + "context" "fmt" "os" "path/filepath" @@ -55,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/pluginmanager" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -311,15 +313,16 @@ func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) plugin &record.FakeRecorder{}, ) - runPluginManager(pluginManager) + tCtx := ktesting.Init(t) + runPluginManager(tCtx, pluginManager) pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler()) return pluginManager } -func runPluginManager(pluginManager pluginmanager.PluginManager) { +func runPluginManager(ctx context.Context, pluginManager pluginmanager.PluginManager) { // FIXME: Replace sets.Set[string] with sets.Set[string] sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true }) - go pluginManager.Run(sourcesReady, wait.NeverStop) + go pluginManager.Run(ctx, sourcesReady, wait.NeverStop) } func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ce8d1e70a43..a0de7801912 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1637,6 +1637,10 @@ func (kl *Kubelet) initializeModules(ctx context.Context) error { // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up. func (kl *Kubelet) initializeRuntimeDependentModules() { + // Use context.TODO() because we currently do not have a proper context to pass in. + // Replace this with an appropriate context when refactoring this function to accept a context parameter. + ctx := context.TODO() + if err := kl.cadvisor.Start(); err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. klog.ErrorS(err, "Failed to start cAdvisor") @@ -1654,7 +1658,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { os.Exit(1) } // containerManager must start after cAdvisor because it needs filesystem capacity information - if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil { + if err := kl.containerManager.Start(ctx, node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. klog.ErrorS(err, "Failed to start ContainerManager") os.Exit(1) @@ -1675,7 +1679,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { // Start the plugin manager klog.V(4).InfoS("Starting plugin manager") - go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) + go kl.pluginManager.Run(ctx, kl.sourcesReady, wait.NeverStop) err = kl.shutdownManager.Start() if err != nil { diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index cdaa42c0a3a..c68c748bdc9 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -21,6 +21,7 @@ keep track of registered plugins. package cache import ( + "context" "fmt" "sync" "time" @@ -47,7 +48,7 @@ type ActualStateOfWorld interface { // because the plugin should have been unregistered in the reconciler and therefore // removed from the actual state of world cache first before adding it back into // the actual state of world cache again with the new timestamp - AddPlugin(pluginInfo PluginInfo) error + AddPlugin(ctx context.Context, pluginInfo PluginInfo) error // RemovePlugin deletes the plugin with the given socket path from the actual // state of world. @@ -92,15 +93,17 @@ type PluginInfo struct { Endpoint string } -func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { +func (asw *actualStateOfWorld) AddPlugin(ctx context.Context, pluginInfo PluginInfo) error { asw.Lock() defer asw.Unlock() + logger := klog.FromContext(ctx) + if pluginInfo.SocketPath == "" { return fmt.Errorf("socket path is empty") } if _, ok := asw.socketFileToInfo[pluginInfo.SocketPath]; ok { - klog.V(2).InfoS("Plugin exists in actual state cache", "path", pluginInfo.SocketPath) + logger.V(2).Info("Plugin exists in actual state cache", "path", pluginInfo.SocketPath) } asw.socketFileToInfo[pluginInfo.SocketPath] = pluginInfo return nil diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go index fcc463c4a74..b72e1041a3c 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/test/utils/ktesting" ) // Calls AddPlugin() to add a plugin @@ -30,6 +31,7 @@ import ( // Verifies PluginExistsWithCorrectUUID returns true for the plugin // Verifies PluginExistsWithCorrectTimestamp returns true for the plugin (excluded on Windows) func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) { + tCtx := ktesting.Init(t) pluginInfo := PluginInfo{ SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", Timestamp: time.Now(), @@ -38,7 +40,7 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) { Name: "test", } asw := NewActualStateOfWorld() - err := asw.AddPlugin(pluginInfo) + err := asw.AddPlugin(tCtx, pluginInfo) // Assert if err != nil { t.Fatalf("AddPlugin failed. Expected: Actual: <%v>", err) @@ -71,6 +73,7 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) { // Verifies PluginExistsWithCorrectUUID returns false // Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows) func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) { + tCtx := ktesting.Init(t) asw := NewActualStateOfWorld() pluginInfo := PluginInfo{ SocketPath: "", @@ -79,7 +82,7 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) { Handler: nil, Name: "test", } - err := asw.AddPlugin(pluginInfo) + err := asw.AddPlugin(tCtx, pluginInfo) require.EqualError(t, err, "socket path is empty") // Get registered plugins and check the newly added plugin is there @@ -106,6 +109,7 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) { // Verifies PluginExistsWithCorrectUUID returns false // Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows) func Test_ASW_RemovePlugin_Positive(t *testing.T) { + tCtx := ktesting.Init(t) // First, add a plugin asw := NewActualStateOfWorld() pluginInfo := PluginInfo{ @@ -115,7 +119,7 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) { Handler: nil, Name: "test", } - err := asw.AddPlugin(pluginInfo) + err := asw.AddPlugin(tCtx, pluginInfo) // Assert if err != nil { t.Fatalf("AddPlugin failed. Expected: Actual: <%v>", err) @@ -146,6 +150,7 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) { // Verifies PluginExistsWithCorrectUUID returns false for an existing // plugin with the wrong UUID func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) { + tCtx := ktesting.Init(t) // First, add a plugin asw := NewActualStateOfWorld() pluginInfo := PluginInfo{ @@ -155,7 +160,7 @@ func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) { Handler: nil, Name: "test", } - err := asw.AddPlugin(pluginInfo) + err := asw.AddPlugin(tCtx, pluginInfo) // Assert if err != nil { t.Fatalf("AddPlugin failed. Expected: Actual: <%v>", err) diff --git a/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go b/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go index 90e38ddd736..6316724a55e 100644 --- a/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go @@ -21,6 +21,7 @@ keep track of registered plugins. package cache import ( + "context" "errors" "fmt" "sync" @@ -38,7 +39,7 @@ type DesiredStateOfWorld interface { // AddOrUpdatePlugin add the given plugin in the cache if it doesn't already exist. // If it does exist in the cache, then the timestamp of the PluginInfo object in the cache will be updated. // An error will be returned if socketPath is empty. - AddOrUpdatePlugin(socketPath string) error + AddOrUpdatePlugin(ctx context.Context, socketPath string) error // RemovePlugin deletes the plugin with the given socket path from the desired // state of world. @@ -122,15 +123,17 @@ func errSuffix(err error) string { return errStr } -func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error { +func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(ctx context.Context, socketPath string) error { dsw.Lock() defer dsw.Unlock() + logger := klog.FromContext(ctx) + if socketPath == "" { return fmt.Errorf("socket path is empty") } if _, ok := dsw.socketFileToInfo[socketPath]; ok { - klog.V(2).InfoS("Plugin exists in desired state cache, timestamp will be updated", "path", socketPath) + logger.V(2).Info("Plugin exists in desired state cache, timestamp will be updated", "path", socketPath) } // Update the PluginInfo object. diff --git a/pkg/kubelet/pluginmanager/cache/desired_state_of_world_test.go b/pkg/kubelet/pluginmanager/cache/desired_state_of_world_test.go index 5b4f227c12e..f6a302ae6dd 100644 --- a/pkg/kubelet/pluginmanager/cache/desired_state_of_world_test.go +++ b/pkg/kubelet/pluginmanager/cache/desired_state_of_world_test.go @@ -20,15 +20,17 @@ import ( "testing" "github.com/stretchr/testify/require" + "k8s.io/kubernetes/test/utils/ktesting" ) // Calls AddOrUpdatePlugin() to add a plugin // Verifies newly added plugin exists in GetPluginsToRegister() // Verifies newly added plugin returns true for PluginExists() func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) { + tCtx := ktesting.Init(t) dsw := NewDesiredStateOfWorld() socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock" - err := dsw.AddOrUpdatePlugin(socketPath) + err := dsw.AddOrUpdatePlugin(tCtx, socketPath) // Assert if err != nil { t.Fatalf("AddOrUpdatePlugin failed. Expected: Actual: <%v>", err) @@ -53,10 +55,11 @@ func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) { // Verifies the timestamp the existing plugin is updated // Verifies newly added plugin returns true for PluginExists() func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) { + tCtx := ktesting.Init(t) dsw := NewDesiredStateOfWorld() socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock" // Adding the plugin for the first time - err := dsw.AddOrUpdatePlugin(socketPath) + err := dsw.AddOrUpdatePlugin(tCtx, socketPath) if err != nil { t.Fatalf("AddOrUpdatePlugin failed. Expected: Actual: <%v>", err) } @@ -72,7 +75,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) { oldUUID := dswPlugins[0].UUID // Adding the plugin again so that the timestamp will be updated - err = dsw.AddOrUpdatePlugin(socketPath) + err = dsw.AddOrUpdatePlugin(tCtx, socketPath) if err != nil { t.Fatalf("AddOrUpdatePlugin failed. Expected: Actual: <%v>", err) } @@ -95,9 +98,10 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) { // Verifies the plugin does not exist in GetPluginsToRegister() after AddOrUpdatePlugin() // Verifies the plugin returns false for PluginExists() func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) { + tCtx := ktesting.Init(t) dsw := NewDesiredStateOfWorld() socketPath := "" - err := dsw.AddOrUpdatePlugin(socketPath) + err := dsw.AddOrUpdatePlugin(tCtx, socketPath) require.EqualError(t, err, "socket path is empty") // Get pluginsToRegister and check the newly added plugin is there @@ -116,10 +120,11 @@ func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) { // Verifies newly removed plugin no longer exists in GetPluginsToRegister() // Verifies newly removed plugin returns false for PluginExists() func Test_DSW_RemovePlugin_Positive(t *testing.T) { + tCtx := ktesting.Init(t) // First, add a plugin dsw := NewDesiredStateOfWorld() socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock" - err := dsw.AddOrUpdatePlugin(socketPath) + err := dsw.AddOrUpdatePlugin(tCtx, socketPath) // Assert if err != nil { t.Fatalf("AddOrUpdatePlugin failed. Expected: Actual: <%v>", err) diff --git a/pkg/kubelet/pluginmanager/metrics/metrics_test.go b/pkg/kubelet/pluginmanager/metrics/metrics_test.go index 700a6396636..751e9a81cd4 100644 --- a/pkg/kubelet/pluginmanager/metrics/metrics_test.go +++ b/pkg/kubelet/pluginmanager/metrics/metrics_test.go @@ -21,22 +21,24 @@ import ( "testing" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestMetricCollection(t *testing.T) { + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() asw := cache.NewActualStateOfWorld() fakePlugin := cache.PluginInfo{ SocketPath: fmt.Sprintf("fake/path/plugin.sock"), } // Add one plugin to DesiredStateOfWorld - err := dsw.AddOrUpdatePlugin(fakePlugin.SocketPath) + err := dsw.AddOrUpdatePlugin(tCtx, fakePlugin.SocketPath) if err != nil { t.Fatalf("AddOrUpdatePlugin failed. Expected: Actual: <%v>", err) } // Add one plugin to ActualStateOfWorld - err = asw.AddPlugin(fakePlugin) + err = asw.AddPlugin(tCtx, fakePlugin) if err != nil { t.Fatalf("AddOrUpdatePlugin failed. Expected: Actual: <%v>", err) } diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go index a1bb6517e81..2330d85f460 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go @@ -21,6 +21,8 @@ limitations under the License. package operationexecutor import ( + "context" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" @@ -45,11 +47,11 @@ import ( type OperationExecutor interface { // RegisterPlugin registers the given plugin using a handler in the plugin handler map. // It then updates the actual state of the world to reflect that. - RegisterPlugin(socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error + RegisterPlugin(ctx context.Context, socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error // UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map. // It then updates the actual state of the world to reflect that. - UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error + UnregisterPlugin(ctx context.Context, pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -68,7 +70,7 @@ type ActualStateOfWorldUpdater interface { // AddPlugin add the given plugin in the cache if no existing plugin // in the cache has the same socket path. // An error will be returned if socketPath is empty. - AddPlugin(pluginInfo cache.PluginInfo) error + AddPlugin(ctx context.Context, pluginInfo cache.PluginInfo) error // RemovePlugin deletes the plugin with the given socket path from the actual // state of world. @@ -93,22 +95,24 @@ func (oe *operationExecutor) IsOperationPending(socketPath string) bool { } func (oe *operationExecutor) RegisterPlugin( + ctx context.Context, socketPath string, pluginUUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error { generatedOperation := - oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, pluginUUID, pluginHandlers, actualStateOfWorld) + oe.operationGenerator.GenerateRegisterPluginFunc(ctx, socketPath, pluginUUID, pluginHandlers, actualStateOfWorld) return oe.pendingOperations.Run( socketPath, generatedOperation) } func (oe *operationExecutor) UnregisterPlugin( + ctx context.Context, pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error { generatedOperation := - oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld) + oe.operationGenerator.GenerateUnregisterPluginFunc(ctx, pluginInfo, actualStateOfWorld) return oe.pendingOperations.Run( pluginInfo.SocketPath, generatedOperation) diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go index 573146bc7d2..f9e62c66ddc 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go @@ -17,6 +17,7 @@ limitations under the License. package operationexecutor import ( + "context" "fmt" "os" "strconv" @@ -27,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -46,10 +48,10 @@ func init() { } func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) { - ch, quit, oe := setup() + tCtx, ch, quit, oe := setup(t) for i := 0; i < numPluginsToRegister; i++ { socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) - err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */) + err := oe.RegisterPlugin(tCtx, socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */) assert.NoError(t, err) } if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) { @@ -58,15 +60,15 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) } func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { - ch, quit, oe := setup() + tCtx, ch, quit, oe := setup(t) socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) // First registration should not fail. - err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */) + err := oe.RegisterPlugin(tCtx, socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */) assert.NoError(t, err) for i := 1; i < numPluginsToRegister; i++ { - err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */) + err := oe.RegisterPlugin(tCtx, socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */) if err == nil { t.Fatalf("RegisterPlugin did not fail. Expected: Actual: ", socketPath) } @@ -78,12 +80,11 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { } func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) { - ch, quit, oe := setup() + tCtx, ch, quit, oe := setup(t) for i := 0; i < numPluginsToUnregister; i++ { socketPath := "socket-path" + strconv.Itoa(i) pluginInfo := cache.PluginInfo{SocketPath: socketPath} - oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */) - + _ = oe.UnregisterPlugin(tCtx, pluginInfo, nil /* actual state of the world updator */) } if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) { t.Fatalf("Unable to start unregister operations in Concurrent for plugins") @@ -91,12 +92,11 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin } func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) { - ch, quit, oe := setup() + tCtx, ch, quit, oe := setup(t) socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) for i := 0; i < numPluginsToUnregister; i++ { pluginInfo := cache.PluginInfo{SocketPath: socketPath} - oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */) - + _ = oe.UnregisterPlugin(tCtx, pluginInfo, nil /* actual state of the world updator */) } if !isOperationRunSerially(ch, quit) { t.Fatalf("Unable to start unregister operations serially for plugins") @@ -116,6 +116,7 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera } func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc( + ctx context.Context, socketPath string, pluginUUID types.UID, pluginHandlers map[string]cache.PluginHandler, @@ -129,6 +130,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc( } func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc( + ctx context.Context, pluginInfo cache.PluginInfo, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { opFunc := func() error { @@ -174,9 +176,10 @@ loop: return false } -func setup() (chan interface{}, chan interface{}, OperationExecutor) { +func setup(t *testing.T) (context.Context, chan interface{}, chan interface{}, OperationExecutor) { + tCtx := ktesting.Init(t) ch, quit := make(chan interface{}), make(chan interface{}) - return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit)) + return tCtx, ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit)) } // This function starts by writing to ch and blocks on the quit channel diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 7b6fbba5b50..745ca231547 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -62,6 +62,7 @@ func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator { type OperationGenerator interface { // Generates the RegisterPlugin function needed to perform the registration of a plugin GenerateRegisterPluginFunc( + ctx context.Context, socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, @@ -69,34 +70,39 @@ type OperationGenerator interface { // Generates the UnregisterPlugin function needed to perform the unregistration of a plugin GenerateUnregisterPluginFunc( + ctx context.Context, pluginInfo cache.PluginInfo, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error } func (og *operationGenerator) GenerateRegisterPluginFunc( + ctx context.Context, socketPath string, pluginUUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { registerPluginFunc := func() error { - client, conn, err := dial(socketPath, dialTimeoutDuration) + logger := klog.FromContext(ctx) + + client, conn, err := dial(ctx, socketPath, dialTimeoutDuration) if err != nil { return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err) } defer conn.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + // Create separate context from parent context + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{}) + infoResp, err := client.GetInfo(ctxWithTimeout, ®isterapi.InfoRequest{}) if err != nil { return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err) } handler, ok := pluginHandlers[infoResp.Type] if !ok { - if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil { + if err := og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil { return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err) } return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath) @@ -106,14 +112,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( infoResp.Endpoint = socketPath } if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { - if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil { + if err = og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil { return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err) } return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed") } // We add the plugin to the actual state of world cache before calling a plugin consumer's Register handle // so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call. - err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{ + err = actualStateOfWorldUpdater.AddPlugin(ctx, cache.PluginInfo{ SocketPath: socketPath, UUID: pluginUUID, Handler: handler, @@ -121,14 +127,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( Endpoint: infoResp.Endpoint, }) if err != nil { - klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) + logger.Error(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) } if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil { - return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) + return og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) } // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate - if err := og.notifyPlugin(client, true, ""); err != nil { + if err := og.notifyPlugin(ctx, client, true, ""); err != nil { return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err) } return nil @@ -137,10 +143,13 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( } func (og *operationGenerator) GenerateUnregisterPluginFunc( + ctx context.Context, pluginInfo cache.PluginInfo, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { unregisterPluginFunc := func() error { + logger := klog.FromContext(ctx) + if pluginInfo.Handler == nil { return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath) } @@ -150,14 +159,14 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc( pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint) - klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) + logger.V(4).Info("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) return nil } return unregisterPluginFunc } -func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error { - ctx, cancel := context.WithTimeout(context.Background(), notifyTimeoutDuration) +func (og *operationGenerator) notifyPlugin(ctx context.Context, client registerapi.RegistrationClient, registered bool, errStr string) error { + ctx, cancel := context.WithTimeout(ctx, notifyTimeoutDuration) defer cancel() status := ®isterapi.RegistrationStatus{ @@ -177,8 +186,8 @@ func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient } // Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial -func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func dial(ctx context.Context, unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() c, err := grpc.DialContext(ctx, unixSocketPath, diff --git a/pkg/kubelet/pluginmanager/plugin_manager.go b/pkg/kubelet/pluginmanager/plugin_manager.go index e1491165feb..754c30ba315 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager.go +++ b/pkg/kubelet/pluginmanager/plugin_manager.go @@ -17,6 +17,7 @@ limitations under the License. package pluginmanager import ( + "context" "time" "k8s.io/apimachinery/pkg/util/runtime" @@ -34,7 +35,7 @@ import ( // need to be registered/deregistered and makes it so. type PluginManager interface { // Starts the plugin manager and all the asynchronous loops that it controls - Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) + Run(ctx context.Context, sourcesReady config.SourcesReady, stopCh <-chan struct{}) // AddHandler adds the given plugin handler for a specific plugin type, which // will be added to the actual state of world cache so that it can be passed to @@ -105,22 +106,24 @@ type pluginManager struct { var _ PluginManager = &pluginManager{} -func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { +func (pm *pluginManager) Run(ctx context.Context, sourcesReady config.SourcesReady, stopCh <-chan struct{}) { defer runtime.HandleCrash() - if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil { - klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!") + logger := klog.FromContext(ctx) + + if err := pm.desiredStateOfWorldPopulator.Start(ctx, stopCh); err != nil { + logger.Error(err, "The desired_state_of_world populator (plugin watcher) starts failed!") return } - klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts") + logger.V(2).Info("The desired_state_of_world populator (plugin watcher) starts") - klog.InfoS("Starting Kubelet Plugin Manager") + logger.Info("Starting Kubelet Plugin Manager") go pm.reconciler.Run(stopCh) metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld) <-stopCh - klog.InfoS("Shutting down Kubelet Plugin Manager") + logger.Info("Shutting down Kubelet Plugin Manager") } func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) { diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go index 99aa22ad7ae..9f1c737d604 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager_test.go +++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher" + "k8s.io/kubernetes/test/utils/ktesting" ) var ( @@ -141,6 +142,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio func TestPluginManager(t *testing.T) { defer cleanup(t) + tCtx := ktesting.Init(t) pluginManager := newTestPluginManager(socketDir) // Start the plugin manager @@ -148,7 +150,7 @@ func TestPluginManager(t *testing.T) { defer close(stopChan) go func() { sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true }) - pluginManager.Run(sourcesReady, stopChan) + pluginManager.Run(tCtx, sourcesReady, stopChan) }() // Add handler for device plugin @@ -170,14 +172,14 @@ func TestPluginManager(t *testing.T) { // Add a new plugin pluginName := fmt.Sprintf("example-plugin-%d", i) p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) // Verify that the plugin is registered waitForRegistration(t, fakeHandler, pluginName, socketPath) // And unregister. fakeHandler.Reset() - require.NoError(t, p.Stop()) + require.NoError(t, p.Stop(tCtx)) waitForDeRegistration(t, fakeHandler, pluginName, socketPath) } } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go b/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go index 3c845a7bc24..3f94ad95136 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go @@ -64,8 +64,8 @@ func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *ex } } -func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { - p.SendEvent(pluginName, exampleEventValidate) +func (p *exampleHandler) ValidatePlugin(ctx context.Context, pluginName string, endpoint string, versions []string) error { + p.SendEvent(ctx, pluginName, exampleEventValidate) n, ok := p.DecreasePluginCount(pluginName) if !ok && n > 0 { @@ -84,11 +84,11 @@ func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, vers return nil } -func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error { - p.SendEvent(pluginName, exampleEventRegister) +func (p *exampleHandler) RegisterPlugin(ctx context.Context, pluginName, endpoint string, versions []string) error { + p.SendEvent(ctx, pluginName, exampleEventRegister) // Verifies the grpcServer is ready to serve services. - _, conn, err := dial(endpoint, time.Second) + _, conn, err := dial(ctx, endpoint, time.Second) if err != nil { return fmt.Errorf("failed dialing endpoint (%s): %v", endpoint, err) } @@ -99,13 +99,13 @@ func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions [] v1beta2Client := v1beta2.NewExampleClient(conn) // Tests v1beta1 GetExampleInfo - _, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{}) + _, err = v1beta1Client.GetExampleInfo(ctx, &v1beta1.ExampleRequest{}) if err != nil { return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err) } // Tests v1beta1 GetExampleInfo - _, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{}) + _, err = v1beta2Client.GetExampleInfo(ctx, &v1beta2.ExampleRequest{}) if err != nil { return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err) } @@ -113,12 +113,13 @@ func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions [] return nil } -func (p *exampleHandler) DeRegisterPlugin(pluginName string) { - p.SendEvent(pluginName, exampleEventDeRegister) +func (p *exampleHandler) DeRegisterPlugin(ctx context.Context, pluginName string) { + p.SendEvent(ctx, pluginName, exampleEventDeRegister) } -func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) { - klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName]) +func (p *exampleHandler) SendEvent(ctx context.Context, pluginName string, event examplePluginEvent) { + logger := klog.FromContext(ctx) + logger.V(2).Info("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName]) p.eventChans[pluginName] <- event } @@ -135,8 +136,8 @@ func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok boo } // Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial -func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func dial(ctx context.Context, unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() c, err := grpc.DialContext(ctx, unixSocketPath, diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go b/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go index c6be96c9ce4..b73a276df11 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go @@ -50,7 +50,8 @@ type pluginServiceV1Beta1 struct { } func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) { - klog.InfoS("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field) + logger := klog.FromContext(ctx) + logger.Info("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field) return &v1beta1.ExampleResponse{}, nil } @@ -63,7 +64,8 @@ type pluginServiceV1Beta2 struct { } func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) { - klog.InfoS("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field) + logger := klog.FromContext(ctx) + logger.Info("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field) return &v1beta2.ExampleResponse{}, nil } @@ -105,7 +107,8 @@ func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoReques } func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { - klog.InfoS("Notify registration status", "status", status) + logger := klog.FromContext(ctx) + logger.Info("Notify registration status", "status", status) if e.registrationStatus != nil { e.registrationStatus <- *status @@ -115,14 +118,15 @@ func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *re } // Serve starts a pluginwatcher server and one or more of the plugin services -func (e *examplePlugin) Serve(services ...string) error { - klog.InfoS("Starting example server", "endpoint", e.endpoint) +func (e *examplePlugin) Serve(ctx context.Context, services ...string) error { + logger := klog.FromContext(ctx) + logger.Info("Starting example server", "endpoint", e.endpoint) lis, err := net.Listen("unix", e.endpoint) if err != nil { return err } - klog.InfoS("Example server started", "endpoint", e.endpoint) + logger.Info("Example server started", "endpoint", e.endpoint) e.grpcServer = grpc.NewServer() // Registers kubelet plugin watcher api. @@ -147,15 +151,16 @@ func (e *examplePlugin) Serve(services ...string) error { defer e.wg.Done() // Blocking call to accept incoming connections. if err := e.grpcServer.Serve(lis); err != nil { - klog.ErrorS(err, "Example server stopped serving") + logger.Error(err, "Example server stopped serving") } }() return nil } -func (e *examplePlugin) Stop() error { - klog.InfoS("Stopping example server", "endpoint", e.endpoint) +func (e *examplePlugin) Stop(ctx context.Context) error { + logger := klog.FromContext(ctx) + logger.Info("Stopping example server", "endpoint", e.endpoint) e.grpcServer.Stop() c := make(chan struct{}) diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go index 44a08012c0d..dc3e6a7d087 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go @@ -17,6 +17,7 @@ limitations under the License. package pluginwatcher import ( + "context" "fmt" "os" "strings" @@ -47,12 +48,13 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) * } // Start watches for the creation and deletion of plugin sockets at the path -func (w *Watcher) Start(stopCh <-chan struct{}) error { - klog.V(2).InfoS("Plugin Watcher Start", "path", w.path) +func (w *Watcher) Start(ctx context.Context, stopCh <-chan struct{}) error { + logger := klog.FromContext(ctx) + logger.V(2).Info("Plugin Watcher Start", "path", w.path) // Creating the directory to be watched if it doesn't exist yet, // and walks through the directory to discover the existing plugins. - if err := w.init(); err != nil { + if err := w.init(ctx); err != nil { return err } @@ -63,8 +65,8 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { w.fsWatcher = fsWatcher // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine. - if err := w.traversePluginDir(w.path); err != nil { - klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path) + if err := w.traversePluginDir(ctx, w.path); err != nil { + logger.Error(err, "Failed to traverse plugin socket path", "path", w.path) } go func(fsWatcher *fsnotify.Watcher) { @@ -73,17 +75,17 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { case event := <-fsWatcher.Events: //TODO: Handle errors by taking corrective measures if event.Has(fsnotify.Create) { - err := w.handleCreateEvent(event) + err := w.handleCreateEvent(ctx, event) if err != nil { - klog.ErrorS(err, "Error when handling create event", "event", event) + logger.Error(err, "Error when handling create event", "event", event) } } else if event.Has(fsnotify.Remove) { - w.handleDeleteEvent(event) + w.handleDeleteEvent(ctx, event) } continue case err := <-fsWatcher.Errors: if err != nil { - klog.ErrorS(err, "FsWatcher received error") + logger.Error(err, "FsWatcher received error") } continue case <-stopCh: @@ -96,8 +98,9 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { return nil } -func (w *Watcher) init() error { - klog.V(4).InfoS("Ensuring Plugin directory", "path", w.path) +func (w *Watcher) init(ctx context.Context) error { + logger := klog.FromContext(ctx) + logger.V(4).Info("Ensuring Plugin directory", "path", w.path) if err := w.fs.MkdirAll(w.path, 0755); err != nil { return fmt.Errorf("error (re-)creating root %s: %v", w.path, err) @@ -108,7 +111,9 @@ func (w *Watcher) init() error { // Walks through the plugin directory discover any existing plugin sockets. // Ignore all errors except root dir not being walkable -func (w *Watcher) traversePluginDir(dir string) error { +func (w *Watcher) traversePluginDir(ctx context.Context, dir string) error { + logger := klog.FromContext(ctx) + // watch the new dir err := w.fsWatcher.Add(dir) if err != nil { @@ -121,7 +126,7 @@ func (w *Watcher) traversePluginDir(dir string) error { return fmt.Errorf("error accessing path: %s error: %v", path, err) } - klog.ErrorS(err, "Error accessing path", "path", path) + logger.Error(err, "Error accessing path", "path", path) return nil } @@ -141,11 +146,11 @@ func (w *Watcher) traversePluginDir(dir string) error { Op: fsnotify.Create, } //TODO: Handle errors by taking corrective measures - if err := w.handleCreateEvent(event); err != nil { - klog.ErrorS(err, "Error when handling create", "event", event) + if err := w.handleCreateEvent(ctx, event); err != nil { + logger.Error(err, "Error when handling create", "event", event) } } else { - klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode) + logger.V(5).Info("Ignoring file", "path", path, "mode", mode) } return nil @@ -155,8 +160,9 @@ func (w *Watcher) traversePluginDir(dir string) error { // Handle filesystem notify event. // Files names: // - MUST NOT start with a '.' -func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { - klog.V(6).InfoS("Handling create event", "event", event) +func (w *Watcher) handleCreateEvent(ctx context.Context, event fsnotify.Event) error { + logger := klog.FromContext(ctx) + logger.V(6).Info("Handling create event", "event", event) fi, err := getStat(event) if err != nil { @@ -164,7 +170,7 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { } if strings.HasPrefix(fi.Name(), ".") { - klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name()) + logger.V(5).Info("Ignoring file (starts with '.')", "path", fi.Name()) return nil } @@ -174,35 +180,37 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err) } if !isSocket { - klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name()) + logger.V(5).Info("Ignoring non socket file", "path", fi.Name()) return nil } - return w.handlePluginRegistration(event.Name) + return w.handlePluginRegistration(ctx, event.Name) } - return w.traversePluginDir(event.Name) + return w.traversePluginDir(ctx, event.Name) } -func (w *Watcher) handlePluginRegistration(socketPath string) error { +func (w *Watcher) handlePluginRegistration(ctx context.Context, socketPath string) error { + logger := klog.FromContext(ctx) socketPath = getSocketPath(socketPath) // Update desired state of world list of plugins // If the socket path does exist in the desired world cache, there's still // a possibility that it has been deleted and recreated again before it is // removed from the desired world cache, so we still need to call AddOrUpdatePlugin // in this case to update the timestamp - klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath) - err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath) + logger.V(2).Info("Adding socket path or updating timestamp to desired state cache", "path", socketPath) + err := w.desiredStateOfWorld.AddOrUpdatePlugin(ctx, socketPath) if err != nil { return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err) } return nil } -func (w *Watcher) handleDeleteEvent(event fsnotify.Event) { - klog.V(6).InfoS("Handling delete event", "event", event) +func (w *Watcher) handleDeleteEvent(ctx context.Context, event fsnotify.Event) { + logger := klog.FromContext(ctx) + logger.V(6).Info("Handling delete event", "event", event) socketPath := event.Name - klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath) + logger.V(2).Info("Removing socket path from desired state cache", "path", socketPath) w.desiredStateOfWorld.RemovePlugin(socketPath) } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher_test.go b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher_test.go index d610d37a9ce..072fdbfcfca 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher_test.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/klog/v2" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + "k8s.io/kubernetes/test/utils/ktesting" ) var ( @@ -39,7 +40,8 @@ var ( func init() { var logLevel string - klog.InitFlags(flag.CommandLine) + flags := &flag.FlagSet{} + klog.InitFlags(flags) flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) flag.StringVar(&logLevel, "logLevel", "6", "test") flag.Lookup("v").Value.Set(logLevel) @@ -107,6 +109,7 @@ func TestPluginRegistration(t *testing.T) { socketDir := initTempDir(t) defer os.RemoveAll(socketDir) + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() newWatcher(t, socketDir, dsw, wait.NeverStop) @@ -115,7 +118,7 @@ func TestPluginRegistration(t *testing.T) { pluginName := fmt.Sprintf("example-plugin-%d", i) p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) pluginInfo := GetPluginInfo(p) waitForRegistration(t, pluginInfo.SocketPath, dsw) @@ -127,7 +130,7 @@ func TestPluginRegistration(t *testing.T) { } // Stop the plugin; the plugin should be removed from the desired state of world cache - require.NoError(t, p.Stop()) + require.NoError(t, p.Stop(tCtx)) // The following doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event waitForUnregistration(t, pluginInfo.SocketPath, dsw) dswPlugins = dsw.GetPluginsToRegister() @@ -141,6 +144,7 @@ func TestPluginRegistrationSameName(t *testing.T) { socketDir := initTempDir(t) defer os.RemoveAll(socketDir) + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() newWatcher(t, socketDir, dsw, wait.NeverStop) @@ -150,7 +154,7 @@ func TestPluginRegistrationSameName(t *testing.T) { for i := 0; i < 10; i++ { socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i)) p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) pluginInfo := GetPluginInfo(p) waitForRegistration(t, pluginInfo.SocketPath, dsw) @@ -167,6 +171,7 @@ func TestPluginReRegistration(t *testing.T) { socketDir := initTempDir(t) defer os.RemoveAll(socketDir) + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() newWatcher(t, socketDir, dsw, wait.NeverStop) @@ -175,7 +180,7 @@ func TestPluginReRegistration(t *testing.T) { socketPath := filepath.Join(socketDir, "plugin-reregistration.sock") pluginName := "reregister-plugin" p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) pluginInfo := GetPluginInfo(p) lastTimestamp := time.Now() waitForRegistration(t, pluginInfo.SocketPath, dsw) @@ -185,13 +190,13 @@ func TestPluginReRegistration(t *testing.T) { for i := 0; i < 10; i++ { // Stop the plugin; the plugin should be removed from the desired state of world cache // The plugin removal doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event - require.NoError(t, p.Stop()) + require.NoError(t, p.Stop(tCtx)) waitForUnregistration(t, pluginInfo.SocketPath, dsw) // Add the plugin again pluginName := fmt.Sprintf("dep-example-plugin-%d", i) p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) waitForRegistration(t, pluginInfo.SocketPath, dsw) // Check the dsw cache. The updated plugin should be the only plugin in it @@ -210,6 +215,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { socketDir := initTempDir(t) defer os.RemoveAll(socketDir) + tCtx := ktesting.Init(t) plugins := make([]*examplePlugin, 10) for i := 0; i < len(plugins); i++ { @@ -217,9 +223,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { pluginName := fmt.Sprintf("example-plugin-%d", i) p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) defer func(p *examplePlugin) { - require.NoError(t, p.Stop()) + require.NoError(t, p.Stop(tCtx)) }(p) plugins[i] = p @@ -255,8 +261,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) { } func newWatcher(t *testing.T, socketDir string, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher { + tCtx := ktesting.Init(t) w := NewWatcher(socketDir, desiredStateOfWorldCache) - require.NoError(t, w.Start(stopCh)) + require.NoError(t, w.Start(tCtx, stopCh)) return w } diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler.go b/pkg/kubelet/pluginmanager/reconciler/reconciler.go index 730f9823fd9..083a81ac69b 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler.go @@ -20,6 +20,7 @@ limitations under the License. package reconciler import ( + "context" "sync" "time" @@ -83,8 +84,12 @@ type reconciler struct { var _ Reconciler = &reconciler{} func (rc *reconciler) Run(stopCh <-chan struct{}) { + // Use context.TODO() because we currently do not have a proper context to pass in. + // Replace this with an appropriate context when refactoring this function to accept a context parameter. + ctx := context.TODO() + wait.Until(func() { - rc.reconcile() + rc.reconcile(ctx) }, rc.loopSleepDuration, stopCh) @@ -108,7 +113,8 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler { return copyHandlers } -func (rc *reconciler) reconcile() { +func (rc *reconciler) reconcile(ctx context.Context) { + logger := klog.FromContext(ctx) // Unregisterations are triggered before registrations // Ensure plugins that should be unregistered are unregistered. @@ -123,7 +129,7 @@ func (rc *reconciler) reconcile() { // with the same socket path but different timestamp. for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() { if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID { - klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin) + logger.V(5).Info("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin) unregisterPlugin = true break } @@ -131,17 +137,17 @@ func (rc *reconciler) reconcile() { } if unregisterPlugin { - klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin) - err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld) + logger.V(5).Info("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin) + err := rc.operationExecutor.UnregisterPlugin(ctx, registeredPlugin, rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. - klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin) + logger.Error(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin) } if err == nil { - klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin) + logger.V(1).Info("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin) } } } @@ -149,16 +155,16 @@ func (rc *reconciler) reconcile() { // Ensure plugins that should be registered are registered for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() { if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) { - klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister) - err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld) + logger.V(5).Info("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister) + err := rc.operationExecutor.RegisterPlugin(ctx, pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. - klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister) + logger.Error(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister) } if err == nil { - klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister) + logger.V(1).Info("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister) } } } diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go index 68e72ddcaae..f7b8822dccf 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -170,6 +171,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { func Test_Run_Positive_Register(t *testing.T) { defer cleanup(t) + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() asw := cache.NewActualStateOfWorld() di := NewDummyImpl() @@ -192,11 +194,12 @@ func Test_Run_Positive_Register(t *testing.T) { socketPath := filepath.Join(socketDir, "plugin.sock") pluginName := fmt.Sprintf("example-plugin") p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) defer func() { - require.NoError(t, p.Stop()) + require.NoError(t, p.Stop(tCtx)) }() - dsw.AddOrUpdatePlugin(socketPath) + require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath)) + plugins := dsw.GetPluginsToRegister() waitForRegistration(t, socketPath, plugins[0].UUID, asw) @@ -218,6 +221,7 @@ func Test_Run_Positive_Register(t *testing.T) { func Test_Run_Positive_RegisterThenUnregister(t *testing.T) { defer cleanup(t) + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() asw := cache.NewActualStateOfWorld() di := NewDummyImpl() @@ -241,8 +245,8 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) { socketPath := filepath.Join(socketDir, "plugin.sock") pluginName := fmt.Sprintf("example-plugin") p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) - dsw.AddOrUpdatePlugin(socketPath) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) + require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath)) plugins := dsw.GetPluginsToRegister() waitForRegistration(t, socketPath, plugins[0].UUID, asw) @@ -274,6 +278,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) { func Test_Run_Positive_ReRegister(t *testing.T) { defer cleanup(t) + tCtx := ktesting.Init(t) dsw := cache.NewDesiredStateOfWorld() asw := cache.NewActualStateOfWorld() di := NewDummyImpl() @@ -297,13 +302,13 @@ func Test_Run_Positive_ReRegister(t *testing.T) { socketPath := filepath.Join(socketDir, "plugin2.sock") pluginName := fmt.Sprintf("example-plugin2") p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) - require.NoError(t, p.Serve("v1beta1", "v1beta2")) - dsw.AddOrUpdatePlugin(socketPath) + require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2")) + require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath)) plugins := dsw.GetPluginsToRegister() waitForRegistration(t, socketPath, plugins[0].UUID, asw) // Add the plugin again to update the timestamp - dsw.AddOrUpdatePlugin(socketPath) + require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath)) // This should trigger a deregistration and a regitration // The process of unregistration and reregistration can happen so fast that // we are not able to catch it with waitForUnregistration, so here we are checking