chore(kubelet): migrate pluginmanager to contextual logging

This commit is contained in:
phuhung273 2025-07-10 22:15:24 +07:00
parent 9822e51403
commit 5d20dc55bf
18 changed files with 221 additions and 143 deletions

View file

@ -17,6 +17,7 @@ limitations under the License.
package devicemanager
import (
"context"
"fmt"
"os"
"path/filepath"
@ -55,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@ -311,15 +313,16 @@ func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) plugin
&record.FakeRecorder{},
)
runPluginManager(pluginManager)
tCtx := ktesting.Init(t)
runPluginManager(tCtx, pluginManager)
pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
return pluginManager
}
func runPluginManager(pluginManager pluginmanager.PluginManager) {
func runPluginManager(ctx context.Context, pluginManager pluginmanager.PluginManager) {
// FIXME: Replace sets.Set[string] with sets.Set[string]
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
go pluginManager.Run(sourcesReady, wait.NeverStop)
go pluginManager.Run(ctx, sourcesReady, wait.NeverStop)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) {

View file

@ -1637,6 +1637,10 @@ func (kl *Kubelet) initializeModules(ctx context.Context) error {
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start cAdvisor")
@ -1654,7 +1658,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1)
}
// containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
if err := kl.containerManager.Start(ctx, node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start ContainerManager")
os.Exit(1)
@ -1675,7 +1679,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// Start the plugin manager
klog.V(4).InfoS("Starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
go kl.pluginManager.Run(ctx, kl.sourcesReady, wait.NeverStop)
err = kl.shutdownManager.Start()
if err != nil {

View file

@ -21,6 +21,7 @@ keep track of registered plugins.
package cache
import (
"context"
"fmt"
"sync"
"time"
@ -47,7 +48,7 @@ type ActualStateOfWorld interface {
// because the plugin should have been unregistered in the reconciler and therefore
// removed from the actual state of world cache first before adding it back into
// the actual state of world cache again with the new timestamp
AddPlugin(pluginInfo PluginInfo) error
AddPlugin(ctx context.Context, pluginInfo PluginInfo) error
// RemovePlugin deletes the plugin with the given socket path from the actual
// state of world.
@ -92,15 +93,17 @@ type PluginInfo struct {
Endpoint string
}
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {
func (asw *actualStateOfWorld) AddPlugin(ctx context.Context, pluginInfo PluginInfo) error {
asw.Lock()
defer asw.Unlock()
logger := klog.FromContext(ctx)
if pluginInfo.SocketPath == "" {
return fmt.Errorf("socket path is empty")
}
if _, ok := asw.socketFileToInfo[pluginInfo.SocketPath]; ok {
klog.V(2).InfoS("Plugin exists in actual state cache", "path", pluginInfo.SocketPath)
logger.V(2).Info("Plugin exists in actual state cache", "path", pluginInfo.SocketPath)
}
asw.socketFileToInfo[pluginInfo.SocketPath] = pluginInfo
return nil

View file

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/test/utils/ktesting"
)
// Calls AddPlugin() to add a plugin
@ -30,6 +31,7 @@ import (
// Verifies PluginExistsWithCorrectUUID returns true for the plugin
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin (excluded on Windows)
func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
tCtx := ktesting.Init(t)
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
@ -38,7 +40,7 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
Name: "test",
}
asw := NewActualStateOfWorld()
err := asw.AddPlugin(pluginInfo)
err := asw.AddPlugin(tCtx, pluginInfo)
// Assert
if err != nil {
t.Fatalf("AddPlugin failed. Expected: <no error> Actual: <%v>", err)
@ -71,6 +73,7 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
// Verifies PluginExistsWithCorrectUUID returns false
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
tCtx := ktesting.Init(t)
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "",
@ -79,7 +82,7 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
err := asw.AddPlugin(tCtx, pluginInfo)
require.EqualError(t, err, "socket path is empty")
// Get registered plugins and check the newly added plugin is there
@ -106,6 +109,7 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
// Verifies PluginExistsWithCorrectUUID returns false
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
func Test_ASW_RemovePlugin_Positive(t *testing.T) {
tCtx := ktesting.Init(t)
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
@ -115,7 +119,7 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
err := asw.AddPlugin(tCtx, pluginInfo)
// Assert
if err != nil {
t.Fatalf("AddPlugin failed. Expected: <no error> Actual: <%v>", err)
@ -146,6 +150,7 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
// Verifies PluginExistsWithCorrectUUID returns false for an existing
// plugin with the wrong UUID
func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) {
tCtx := ktesting.Init(t)
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
@ -155,7 +160,7 @@ func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) {
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
err := asw.AddPlugin(tCtx, pluginInfo)
// Assert
if err != nil {
t.Fatalf("AddPlugin failed. Expected: <no error> Actual: <%v>", err)

View file

@ -21,6 +21,7 @@ keep track of registered plugins.
package cache
import (
"context"
"errors"
"fmt"
"sync"
@ -38,7 +39,7 @@ type DesiredStateOfWorld interface {
// AddOrUpdatePlugin add the given plugin in the cache if it doesn't already exist.
// If it does exist in the cache, then the timestamp of the PluginInfo object in the cache will be updated.
// An error will be returned if socketPath is empty.
AddOrUpdatePlugin(socketPath string) error
AddOrUpdatePlugin(ctx context.Context, socketPath string) error
// RemovePlugin deletes the plugin with the given socket path from the desired
// state of world.
@ -122,15 +123,17 @@ func errSuffix(err error) string {
return errStr
}
func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(ctx context.Context, socketPath string) error {
dsw.Lock()
defer dsw.Unlock()
logger := klog.FromContext(ctx)
if socketPath == "" {
return fmt.Errorf("socket path is empty")
}
if _, ok := dsw.socketFileToInfo[socketPath]; ok {
klog.V(2).InfoS("Plugin exists in desired state cache, timestamp will be updated", "path", socketPath)
logger.V(2).Info("Plugin exists in desired state cache, timestamp will be updated", "path", socketPath)
}
// Update the PluginInfo object.

View file

@ -20,15 +20,17 @@ import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/test/utils/ktesting"
)
// Calls AddOrUpdatePlugin() to add a plugin
// Verifies newly added plugin exists in GetPluginsToRegister()
// Verifies newly added plugin returns true for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
tCtx := ktesting.Init(t)
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
err := dsw.AddOrUpdatePlugin(socketPath)
err := dsw.AddOrUpdatePlugin(tCtx, socketPath)
// Assert
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
@ -53,10 +55,11 @@ func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
// Verifies the timestamp the existing plugin is updated
// Verifies newly added plugin returns true for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
tCtx := ktesting.Init(t)
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
// Adding the plugin for the first time
err := dsw.AddOrUpdatePlugin(socketPath)
err := dsw.AddOrUpdatePlugin(tCtx, socketPath)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
@ -72,7 +75,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
oldUUID := dswPlugins[0].UUID
// Adding the plugin again so that the timestamp will be updated
err = dsw.AddOrUpdatePlugin(socketPath)
err = dsw.AddOrUpdatePlugin(tCtx, socketPath)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
@ -95,9 +98,10 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
// Verifies the plugin does not exist in GetPluginsToRegister() after AddOrUpdatePlugin()
// Verifies the plugin returns false for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) {
tCtx := ktesting.Init(t)
dsw := NewDesiredStateOfWorld()
socketPath := ""
err := dsw.AddOrUpdatePlugin(socketPath)
err := dsw.AddOrUpdatePlugin(tCtx, socketPath)
require.EqualError(t, err, "socket path is empty")
// Get pluginsToRegister and check the newly added plugin is there
@ -116,10 +120,11 @@ func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) {
// Verifies newly removed plugin no longer exists in GetPluginsToRegister()
// Verifies newly removed plugin returns false for PluginExists()
func Test_DSW_RemovePlugin_Positive(t *testing.T) {
tCtx := ktesting.Init(t)
// First, add a plugin
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
err := dsw.AddOrUpdatePlugin(socketPath)
err := dsw.AddOrUpdatePlugin(tCtx, socketPath)
// Assert
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)

View file

@ -21,22 +21,24 @@ import (
"testing"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestMetricCollection(t *testing.T) {
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakePlugin := cache.PluginInfo{
SocketPath: fmt.Sprintf("fake/path/plugin.sock"),
}
// Add one plugin to DesiredStateOfWorld
err := dsw.AddOrUpdatePlugin(fakePlugin.SocketPath)
err := dsw.AddOrUpdatePlugin(tCtx, fakePlugin.SocketPath)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Add one plugin to ActualStateOfWorld
err = asw.AddPlugin(fakePlugin)
err = asw.AddPlugin(tCtx, fakePlugin)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}

View file

@ -21,6 +21,8 @@ limitations under the License.
package operationexecutor
import (
"context"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
@ -45,11 +47,11 @@ import (
type OperationExecutor interface {
// RegisterPlugin registers the given plugin using a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
RegisterPlugin(ctx context.Context, socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
UnregisterPlugin(ctx context.Context, pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
}
// NewOperationExecutor returns a new instance of OperationExecutor.
@ -68,7 +70,7 @@ type ActualStateOfWorldUpdater interface {
// AddPlugin add the given plugin in the cache if no existing plugin
// in the cache has the same socket path.
// An error will be returned if socketPath is empty.
AddPlugin(pluginInfo cache.PluginInfo) error
AddPlugin(ctx context.Context, pluginInfo cache.PluginInfo) error
// RemovePlugin deletes the plugin with the given socket path from the actual
// state of world.
@ -93,22 +95,24 @@ func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
}
func (oe *operationExecutor) RegisterPlugin(
ctx context.Context,
socketPath string,
pluginUUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, pluginUUID, pluginHandlers, actualStateOfWorld)
oe.operationGenerator.GenerateRegisterPluginFunc(ctx, socketPath, pluginUUID, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run(
socketPath, generatedOperation)
}
func (oe *operationExecutor) UnregisterPlugin(
ctx context.Context,
pluginInfo cache.PluginInfo,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)
oe.operationGenerator.GenerateUnregisterPluginFunc(ctx, pluginInfo, actualStateOfWorld)
return oe.pendingOperations.Run(
pluginInfo.SocketPath, generatedOperation)

View file

@ -17,6 +17,7 @@ limitations under the License.
package operationexecutor
import (
"context"
"fmt"
"os"
"strconv"
@ -27,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@ -46,10 +48,10 @@ func init() {
}
func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
tCtx, ch, quit, oe := setup(t)
for i := 0; i < numPluginsToRegister; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
err := oe.RegisterPlugin(tCtx, socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
assert.NoError(t, err)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
@ -58,15 +60,15 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
}
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
tCtx, ch, quit, oe := setup(t)
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
// First registration should not fail.
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
err := oe.RegisterPlugin(tCtx, socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
assert.NoError(t, err)
for i := 1; i < numPluginsToRegister; i++ {
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
err := oe.RegisterPlugin(tCtx, socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
if err == nil {
t.Fatalf("RegisterPlugin did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name is already executing.> Actual: <no error>", socketPath)
}
@ -78,12 +80,11 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
}
func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup()
tCtx, ch, quit, oe := setup(t)
for i := 0; i < numPluginsToUnregister; i++ {
socketPath := "socket-path" + strconv.Itoa(i)
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
_ = oe.UnregisterPlugin(tCtx, pluginInfo, nil /* actual state of the world updator */)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
t.Fatalf("Unable to start unregister operations in Concurrent for plugins")
@ -91,12 +92,11 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin
}
func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup()
tCtx, ch, quit, oe := setup(t)
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
for i := 0; i < numPluginsToUnregister; i++ {
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
_ = oe.UnregisterPlugin(tCtx, pluginInfo, nil /* actual state of the world updator */)
}
if !isOperationRunSerially(ch, quit) {
t.Fatalf("Unable to start unregister operations serially for plugins")
@ -116,6 +116,7 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
}
func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
ctx context.Context,
socketPath string,
pluginUUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
@ -129,6 +130,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
}
func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
ctx context.Context,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error {
@ -174,9 +176,10 @@ loop:
return false
}
func setup() (chan interface{}, chan interface{}, OperationExecutor) {
func setup(t *testing.T) (context.Context, chan interface{}, chan interface{}, OperationExecutor) {
tCtx := ktesting.Init(t)
ch, quit := make(chan interface{}), make(chan interface{})
return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
return tCtx, ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
}
// This function starts by writing to ch and blocks on the quit channel

View file

@ -62,6 +62,7 @@ func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
type OperationGenerator interface {
// Generates the RegisterPlugin function needed to perform the registration of a plugin
GenerateRegisterPluginFunc(
ctx context.Context,
socketPath string,
UUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
@ -69,34 +70,39 @@ type OperationGenerator interface {
// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
GenerateUnregisterPluginFunc(
ctx context.Context,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
}
func (og *operationGenerator) GenerateRegisterPluginFunc(
ctx context.Context,
socketPath string,
pluginUUID types.UID,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
registerPluginFunc := func() error {
client, conn, err := dial(socketPath, dialTimeoutDuration)
logger := klog.FromContext(ctx)
client, conn, err := dial(ctx, socketPath, dialTimeoutDuration)
if err != nil {
return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// Create separate context from parent context
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
infoResp, err := client.GetInfo(ctxWithTimeout, &registerapi.InfoRequest{})
if err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
}
handler, ok := pluginHandlers[infoResp.Type]
if !ok {
if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
if err := og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
@ -106,14 +112,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
infoResp.Endpoint = socketPath
}
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
if err = og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
}
// We add the plugin to the actual state of world cache before calling a plugin consumer's Register handle
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
err = actualStateOfWorldUpdater.AddPlugin(ctx, cache.PluginInfo{
SocketPath: socketPath,
UUID: pluginUUID,
Handler: handler,
@ -121,14 +127,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
Endpoint: infoResp.Endpoint,
})
if err != nil {
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
logger.Error(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
}
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
return og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
if err := og.notifyPlugin(client, true, ""); err != nil {
if err := og.notifyPlugin(ctx, client, true, ""); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
}
return nil
@ -137,10 +143,13 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
}
func (og *operationGenerator) GenerateUnregisterPluginFunc(
ctx context.Context,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
unregisterPluginFunc := func() error {
logger := klog.FromContext(ctx)
if pluginInfo.Handler == nil {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
}
@ -150,14 +159,14 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc(
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint)
klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
logger.V(4).Info("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
return nil
}
return unregisterPluginFunc
}
func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error {
ctx, cancel := context.WithTimeout(context.Background(), notifyTimeoutDuration)
func (og *operationGenerator) notifyPlugin(ctx context.Context, client registerapi.RegistrationClient, registered bool, errStr string) error {
ctx, cancel := context.WithTimeout(ctx, notifyTimeoutDuration)
defer cancel()
status := &registerapi.RegistrationStatus{
@ -177,8 +186,8 @@ func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient
}
// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func dial(ctx context.Context, unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath,

View file

@ -17,6 +17,7 @@ limitations under the License.
package pluginmanager
import (
"context"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
@ -34,7 +35,7 @@ import (
// need to be registered/deregistered and makes it so.
type PluginManager interface {
// Starts the plugin manager and all the asynchronous loops that it controls
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
Run(ctx context.Context, sourcesReady config.SourcesReady, stopCh <-chan struct{})
// AddHandler adds the given plugin handler for a specific plugin type, which
// will be added to the actual state of world cache so that it can be passed to
@ -105,22 +106,24 @@ type pluginManager struct {
var _ PluginManager = &pluginManager{}
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
func (pm *pluginManager) Run(ctx context.Context, sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
logger := klog.FromContext(ctx)
if err := pm.desiredStateOfWorldPopulator.Start(ctx, stopCh); err != nil {
logger.Error(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
return
}
klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")
logger.V(2).Info("The desired_state_of_world populator (plugin watcher) starts")
klog.InfoS("Starting Kubelet Plugin Manager")
logger.Info("Starting Kubelet Plugin Manager")
go pm.reconciler.Run(stopCh)
metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
<-stopCh
klog.InfoS("Shutting down Kubelet Plugin Manager")
logger.Info("Shutting down Kubelet Plugin Manager")
}
func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {

View file

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
"k8s.io/kubernetes/test/utils/ktesting"
)
var (
@ -141,6 +142,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
func TestPluginManager(t *testing.T) {
defer cleanup(t)
tCtx := ktesting.Init(t)
pluginManager := newTestPluginManager(socketDir)
// Start the plugin manager
@ -148,7 +150,7 @@ func TestPluginManager(t *testing.T) {
defer close(stopChan)
go func() {
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
pluginManager.Run(sourcesReady, stopChan)
pluginManager.Run(tCtx, sourcesReady, stopChan)
}()
// Add handler for device plugin
@ -170,14 +172,14 @@ func TestPluginManager(t *testing.T) {
// Add a new plugin
pluginName := fmt.Sprintf("example-plugin-%d", i)
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
// Verify that the plugin is registered
waitForRegistration(t, fakeHandler, pluginName, socketPath)
// And unregister.
fakeHandler.Reset()
require.NoError(t, p.Stop())
require.NoError(t, p.Stop(tCtx))
waitForDeRegistration(t, fakeHandler, pluginName, socketPath)
}
}

View file

@ -64,8 +64,8 @@ func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *ex
}
}
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventValidate)
func (p *exampleHandler) ValidatePlugin(ctx context.Context, pluginName string, endpoint string, versions []string) error {
p.SendEvent(ctx, pluginName, exampleEventValidate)
n, ok := p.DecreasePluginCount(pluginName)
if !ok && n > 0 {
@ -84,11 +84,11 @@ func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, vers
return nil
}
func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventRegister)
func (p *exampleHandler) RegisterPlugin(ctx context.Context, pluginName, endpoint string, versions []string) error {
p.SendEvent(ctx, pluginName, exampleEventRegister)
// Verifies the grpcServer is ready to serve services.
_, conn, err := dial(endpoint, time.Second)
_, conn, err := dial(ctx, endpoint, time.Second)
if err != nil {
return fmt.Errorf("failed dialing endpoint (%s): %v", endpoint, err)
}
@ -99,13 +99,13 @@ func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []
v1beta2Client := v1beta2.NewExampleClient(conn)
// Tests v1beta1 GetExampleInfo
_, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{})
_, err = v1beta1Client.GetExampleInfo(ctx, &v1beta1.ExampleRequest{})
if err != nil {
return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
}
// Tests v1beta1 GetExampleInfo
_, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{})
_, err = v1beta2Client.GetExampleInfo(ctx, &v1beta2.ExampleRequest{})
if err != nil {
return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
}
@ -113,12 +113,13 @@ func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []
return nil
}
func (p *exampleHandler) DeRegisterPlugin(pluginName string) {
p.SendEvent(pluginName, exampleEventDeRegister)
func (p *exampleHandler) DeRegisterPlugin(ctx context.Context, pluginName string) {
p.SendEvent(ctx, pluginName, exampleEventDeRegister)
}
func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) {
klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName])
func (p *exampleHandler) SendEvent(ctx context.Context, pluginName string, event examplePluginEvent) {
logger := klog.FromContext(ctx)
logger.V(2).Info("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName])
p.eventChans[pluginName] <- event
}
@ -135,8 +136,8 @@ func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok boo
}
// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func dial(ctx context.Context, unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath,

View file

@ -50,7 +50,8 @@ type pluginServiceV1Beta1 struct {
}
func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
klog.InfoS("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field)
logger := klog.FromContext(ctx)
logger.Info("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field)
return &v1beta1.ExampleResponse{}, nil
}
@ -63,7 +64,8 @@ type pluginServiceV1Beta2 struct {
}
func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
klog.InfoS("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field)
logger := klog.FromContext(ctx)
logger.Info("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field)
return &v1beta2.ExampleResponse{}, nil
}
@ -105,7 +107,8 @@ func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoReques
}
func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
klog.InfoS("Notify registration status", "status", status)
logger := klog.FromContext(ctx)
logger.Info("Notify registration status", "status", status)
if e.registrationStatus != nil {
e.registrationStatus <- *status
@ -115,14 +118,15 @@ func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *re
}
// Serve starts a pluginwatcher server and one or more of the plugin services
func (e *examplePlugin) Serve(services ...string) error {
klog.InfoS("Starting example server", "endpoint", e.endpoint)
func (e *examplePlugin) Serve(ctx context.Context, services ...string) error {
logger := klog.FromContext(ctx)
logger.Info("Starting example server", "endpoint", e.endpoint)
lis, err := net.Listen("unix", e.endpoint)
if err != nil {
return err
}
klog.InfoS("Example server started", "endpoint", e.endpoint)
logger.Info("Example server started", "endpoint", e.endpoint)
e.grpcServer = grpc.NewServer()
// Registers kubelet plugin watcher api.
@ -147,15 +151,16 @@ func (e *examplePlugin) Serve(services ...string) error {
defer e.wg.Done()
// Blocking call to accept incoming connections.
if err := e.grpcServer.Serve(lis); err != nil {
klog.ErrorS(err, "Example server stopped serving")
logger.Error(err, "Example server stopped serving")
}
}()
return nil
}
func (e *examplePlugin) Stop() error {
klog.InfoS("Stopping example server", "endpoint", e.endpoint)
func (e *examplePlugin) Stop(ctx context.Context) error {
logger := klog.FromContext(ctx)
logger.Info("Stopping example server", "endpoint", e.endpoint)
e.grpcServer.Stop()
c := make(chan struct{})

View file

@ -17,6 +17,7 @@ limitations under the License.
package pluginwatcher
import (
"context"
"fmt"
"os"
"strings"
@ -47,12 +48,13 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *
}
// Start watches for the creation and deletion of plugin sockets at the path
func (w *Watcher) Start(stopCh <-chan struct{}) error {
klog.V(2).InfoS("Plugin Watcher Start", "path", w.path)
func (w *Watcher) Start(ctx context.Context, stopCh <-chan struct{}) error {
logger := klog.FromContext(ctx)
logger.V(2).Info("Plugin Watcher Start", "path", w.path)
// Creating the directory to be watched if it doesn't exist yet,
// and walks through the directory to discover the existing plugins.
if err := w.init(); err != nil {
if err := w.init(ctx); err != nil {
return err
}
@ -63,8 +65,8 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
w.fsWatcher = fsWatcher
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
if err := w.traversePluginDir(w.path); err != nil {
klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
if err := w.traversePluginDir(ctx, w.path); err != nil {
logger.Error(err, "Failed to traverse plugin socket path", "path", w.path)
}
go func(fsWatcher *fsnotify.Watcher) {
@ -73,17 +75,17 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
case event := <-fsWatcher.Events:
//TODO: Handle errors by taking corrective measures
if event.Has(fsnotify.Create) {
err := w.handleCreateEvent(event)
err := w.handleCreateEvent(ctx, event)
if err != nil {
klog.ErrorS(err, "Error when handling create event", "event", event)
logger.Error(err, "Error when handling create event", "event", event)
}
} else if event.Has(fsnotify.Remove) {
w.handleDeleteEvent(event)
w.handleDeleteEvent(ctx, event)
}
continue
case err := <-fsWatcher.Errors:
if err != nil {
klog.ErrorS(err, "FsWatcher received error")
logger.Error(err, "FsWatcher received error")
}
continue
case <-stopCh:
@ -96,8 +98,9 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
return nil
}
func (w *Watcher) init() error {
klog.V(4).InfoS("Ensuring Plugin directory", "path", w.path)
func (w *Watcher) init(ctx context.Context) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Ensuring Plugin directory", "path", w.path)
if err := w.fs.MkdirAll(w.path, 0755); err != nil {
return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
@ -108,7 +111,9 @@ func (w *Watcher) init() error {
// Walks through the plugin directory discover any existing plugin sockets.
// Ignore all errors except root dir not being walkable
func (w *Watcher) traversePluginDir(dir string) error {
func (w *Watcher) traversePluginDir(ctx context.Context, dir string) error {
logger := klog.FromContext(ctx)
// watch the new dir
err := w.fsWatcher.Add(dir)
if err != nil {
@ -121,7 +126,7 @@ func (w *Watcher) traversePluginDir(dir string) error {
return fmt.Errorf("error accessing path: %s error: %v", path, err)
}
klog.ErrorS(err, "Error accessing path", "path", path)
logger.Error(err, "Error accessing path", "path", path)
return nil
}
@ -141,11 +146,11 @@ func (w *Watcher) traversePluginDir(dir string) error {
Op: fsnotify.Create,
}
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
klog.ErrorS(err, "Error when handling create", "event", event)
if err := w.handleCreateEvent(ctx, event); err != nil {
logger.Error(err, "Error when handling create", "event", event)
}
} else {
klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode)
logger.V(5).Info("Ignoring file", "path", path, "mode", mode)
}
return nil
@ -155,8 +160,9 @@ func (w *Watcher) traversePluginDir(dir string) error {
// Handle filesystem notify event.
// Files names:
// - MUST NOT start with a '.'
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).InfoS("Handling create event", "event", event)
func (w *Watcher) handleCreateEvent(ctx context.Context, event fsnotify.Event) error {
logger := klog.FromContext(ctx)
logger.V(6).Info("Handling create event", "event", event)
fi, err := getStat(event)
if err != nil {
@ -164,7 +170,7 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
}
if strings.HasPrefix(fi.Name(), ".") {
klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name())
logger.V(5).Info("Ignoring file (starts with '.')", "path", fi.Name())
return nil
}
@ -174,35 +180,37 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
}
if !isSocket {
klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name())
logger.V(5).Info("Ignoring non socket file", "path", fi.Name())
return nil
}
return w.handlePluginRegistration(event.Name)
return w.handlePluginRegistration(ctx, event.Name)
}
return w.traversePluginDir(event.Name)
return w.traversePluginDir(ctx, event.Name)
}
func (w *Watcher) handlePluginRegistration(socketPath string) error {
func (w *Watcher) handlePluginRegistration(ctx context.Context, socketPath string) error {
logger := klog.FromContext(ctx)
socketPath = getSocketPath(socketPath)
// Update desired state of world list of plugins
// If the socket path does exist in the desired world cache, there's still
// a possibility that it has been deleted and recreated again before it is
// removed from the desired world cache, so we still need to call AddOrUpdatePlugin
// in this case to update the timestamp
klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
logger.V(2).Info("Adding socket path or updating timestamp to desired state cache", "path", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(ctx, socketPath)
if err != nil {
return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
}
return nil
}
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
klog.V(6).InfoS("Handling delete event", "event", event)
func (w *Watcher) handleDeleteEvent(ctx context.Context, event fsnotify.Event) {
logger := klog.FromContext(ctx)
logger.V(6).Info("Handling delete event", "event", event)
socketPath := event.Name
klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath)
logger.V(2).Info("Removing socket path from desired state cache", "path", socketPath)
w.desiredStateOfWorld.RemovePlugin(socketPath)
}

View file

@ -30,6 +30,7 @@ import (
"k8s.io/klog/v2"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/test/utils/ktesting"
)
var (
@ -39,7 +40,8 @@ var (
func init() {
var logLevel string
klog.InitFlags(flag.CommandLine)
flags := &flag.FlagSet{}
klog.InitFlags(flags)
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
flag.StringVar(&logLevel, "logLevel", "6", "test")
flag.Lookup("v").Value.Set(logLevel)
@ -107,6 +109,7 @@ func TestPluginRegistration(t *testing.T) {
socketDir := initTempDir(t)
defer os.RemoveAll(socketDir)
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, socketDir, dsw, wait.NeverStop)
@ -115,7 +118,7 @@ func TestPluginRegistration(t *testing.T) {
pluginName := fmt.Sprintf("example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
@ -127,7 +130,7 @@ func TestPluginRegistration(t *testing.T) {
}
// Stop the plugin; the plugin should be removed from the desired state of world cache
require.NoError(t, p.Stop())
require.NoError(t, p.Stop(tCtx))
// The following doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
waitForUnregistration(t, pluginInfo.SocketPath, dsw)
dswPlugins = dsw.GetPluginsToRegister()
@ -141,6 +144,7 @@ func TestPluginRegistrationSameName(t *testing.T) {
socketDir := initTempDir(t)
defer os.RemoveAll(socketDir)
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, socketDir, dsw, wait.NeverStop)
@ -150,7 +154,7 @@ func TestPluginRegistrationSameName(t *testing.T) {
for i := 0; i < 10; i++ {
socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
@ -167,6 +171,7 @@ func TestPluginReRegistration(t *testing.T) {
socketDir := initTempDir(t)
defer os.RemoveAll(socketDir)
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, socketDir, dsw, wait.NeverStop)
@ -175,7 +180,7 @@ func TestPluginReRegistration(t *testing.T) {
socketPath := filepath.Join(socketDir, "plugin-reregistration.sock")
pluginName := "reregister-plugin"
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p)
lastTimestamp := time.Now()
waitForRegistration(t, pluginInfo.SocketPath, dsw)
@ -185,13 +190,13 @@ func TestPluginReRegistration(t *testing.T) {
for i := 0; i < 10; i++ {
// Stop the plugin; the plugin should be removed from the desired state of world cache
// The plugin removal doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
require.NoError(t, p.Stop())
require.NoError(t, p.Stop(tCtx))
waitForUnregistration(t, pluginInfo.SocketPath, dsw)
// Add the plugin again
pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
waitForRegistration(t, pluginInfo.SocketPath, dsw)
// Check the dsw cache. The updated plugin should be the only plugin in it
@ -210,6 +215,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
socketDir := initTempDir(t)
defer os.RemoveAll(socketDir)
tCtx := ktesting.Init(t)
plugins := make([]*examplePlugin, 10)
for i := 0; i < len(plugins); i++ {
@ -217,9 +223,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
pluginName := fmt.Sprintf("example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
defer func(p *examplePlugin) {
require.NoError(t, p.Stop())
require.NoError(t, p.Stop(tCtx))
}(p)
plugins[i] = p
@ -255,8 +261,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
}
func newWatcher(t *testing.T, socketDir string, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
tCtx := ktesting.Init(t)
w := NewWatcher(socketDir, desiredStateOfWorldCache)
require.NoError(t, w.Start(stopCh))
require.NoError(t, w.Start(tCtx, stopCh))
return w
}

View file

@ -20,6 +20,7 @@ limitations under the License.
package reconciler
import (
"context"
"sync"
"time"
@ -83,8 +84,12 @@ type reconciler struct {
var _ Reconciler = &reconciler{}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
wait.Until(func() {
rc.reconcile()
rc.reconcile(ctx)
},
rc.loopSleepDuration,
stopCh)
@ -108,7 +113,8 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
return copyHandlers
}
func (rc *reconciler) reconcile() {
func (rc *reconciler) reconcile(ctx context.Context) {
logger := klog.FromContext(ctx)
// Unregisterations are triggered before registrations
// Ensure plugins that should be unregistered are unregistered.
@ -123,7 +129,7 @@ func (rc *reconciler) reconcile() {
// with the same socket path but different timestamp.
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID {
klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
logger.V(5).Info("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
unregisterPlugin = true
break
}
@ -131,17 +137,17 @@ func (rc *reconciler) reconcile() {
}
if unregisterPlugin {
klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
logger.V(5).Info("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)
err := rc.operationExecutor.UnregisterPlugin(ctx, registeredPlugin, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)
logger.Error(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)
}
if err == nil {
klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)
logger.V(1).Info("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)
}
}
}
@ -149,16 +155,16 @@ func (rc *reconciler) reconcile() {
// Ensure plugins that should be registered are registered
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) {
klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
logger.V(5).Info("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
err := rc.operationExecutor.RegisterPlugin(ctx, pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)
logger.Error(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)
}
if err == nil {
klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)
logger.V(1).Info("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)
}
}
}

View file

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@ -170,6 +171,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
func Test_Run_Positive_Register(t *testing.T) {
defer cleanup(t)
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
di := NewDummyImpl()
@ -192,11 +194,12 @@ func Test_Run_Positive_Register(t *testing.T) {
socketPath := filepath.Join(socketDir, "plugin.sock")
pluginName := fmt.Sprintf("example-plugin")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
defer func() {
require.NoError(t, p.Stop())
require.NoError(t, p.Stop(tCtx))
}()
dsw.AddOrUpdatePlugin(socketPath)
require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath))
plugins := dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
@ -218,6 +221,7 @@ func Test_Run_Positive_Register(t *testing.T) {
func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
defer cleanup(t)
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
di := NewDummyImpl()
@ -241,8 +245,8 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
socketPath := filepath.Join(socketDir, "plugin.sock")
pluginName := fmt.Sprintf("example-plugin")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
dsw.AddOrUpdatePlugin(socketPath)
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath))
plugins := dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
@ -274,6 +278,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
func Test_Run_Positive_ReRegister(t *testing.T) {
defer cleanup(t)
tCtx := ktesting.Init(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
di := NewDummyImpl()
@ -297,13 +302,13 @@ func Test_Run_Positive_ReRegister(t *testing.T) {
socketPath := filepath.Join(socketDir, "plugin2.sock")
pluginName := fmt.Sprintf("example-plugin2")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
dsw.AddOrUpdatePlugin(socketPath)
require.NoError(t, p.Serve(tCtx, "v1beta1", "v1beta2"))
require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath))
plugins := dsw.GetPluginsToRegister()
waitForRegistration(t, socketPath, plugins[0].UUID, asw)
// Add the plugin again to update the timestamp
dsw.AddOrUpdatePlugin(socketPath)
require.NoError(t, dsw.AddOrUpdatePlugin(tCtx, socketPath))
// This should trigger a deregistration and a regitration
// The process of unregistration and reregistration can happen so fast that
// we are not able to catch it with waitForUnregistration, so here we are checking