From 3156c2a2efb8a16dd1c9704b995ba7568ed7b604 Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Thu, 31 Jul 2025 13:27:35 +0300 Subject: [PATCH] pluginmanager: fix handling registration failures Updated the pluginmanager to handle registration failures better by removing the plugin from the actual state of the world and deregistering when registration or notification fails. Added a new e2e test to verify that the kubelet plugin manager retries plugin registration when NotifyRegistrationStatus call fails. --- .../operationexecutor/operation_generator.go | 3 +++ .../kubeletplugin/draplugin.go | 7 +++++ .../kubeletplugin/registrationserver.go | 26 +++++++++++++++++- test/e2e/dra/test-driver/app/kubeletplugin.go | 11 ++++++++ test/e2e/dra/test-driver/gomega/matchers.go | 11 ++++++++ test/e2e_node/dra_test.go | 27 +++++++++++++++++-- 6 files changed, 82 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 745ca231547..725197323dd 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -130,11 +130,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( logger.Error(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) } if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil { + actualStateOfWorldUpdater.RemovePlugin(socketPath) return og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) } // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate if err := og.notifyPlugin(ctx, client, true, ""); err != nil { + actualStateOfWorldUpdater.RemovePlugin(socketPath) + handler.DeRegisterPlugin(infoResp.Name, infoResp.Endpoint) return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err) } return nil diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index a8e03521635..e5c2304d6bd 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -811,6 +811,13 @@ func (d *Helper) SetGetInfoError(err error) { d.registrar.setGetInfoError(err) } +// SetNotifyRegistrationStatusError configures the registration server +// to make NotifyRegistrationStatus calls return the specified error. +// To restore normal behavior, call SetNotifyRegistrationStatusError(nil). +func (d *Helper) SetNotifyRegistrationStatusError(err error) { + d.registrar.setNotifyRegistrationStatusError(err) +} + // serializeGRPCIfEnabled locks a mutex if serialization is enabled. // Either way it returns a method that the caller must invoke // via defer. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go index e766ad4e467..8c700b87fd8 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go @@ -31,7 +31,8 @@ type registrationServer struct { supportedVersions []string status *registerapi.RegistrationStatus - getInfoError atomic.Pointer[error] + getInfoError atomic.Pointer[error] + notifyRegistrationStatusError atomic.Pointer[error] registerapi.UnsafeRegistrationServer } @@ -53,6 +54,9 @@ func (e *registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoR // NotifyRegistrationStatus is the RPC invoked by plugin watcher. func (e *registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { + if err := e.getNotifyRegistrationStatusError(); err != nil { + return nil, err + } e.status = status if !status.PluginRegistered { return nil, fmt.Errorf("failed registration process: %+v", status.Error) @@ -69,6 +73,14 @@ func (e *registrationServer) getGetInfoError() error { return *errPtr } +func (e *registrationServer) getNotifyRegistrationStatusError() error { + errPtr := e.notifyRegistrationStatusError.Load() + if errPtr == nil { + return nil + } + return *errPtr +} + // setGetInfoError sets the error to be returned by the GetInfo handler of the registration server. // If a non-nil error is provided, subsequent GetInfo calls will return this error. // Passing nil as the err argument will clear any previously set error, effectively disabling erroring. @@ -79,3 +91,15 @@ func (e *registrationServer) setGetInfoError(err error) { } e.getInfoError.Store(&err) } + +// setNotifyRegistrationStatusError sets the error to be returned by the NotifyRegistrationStatus handler +// of the registration server. +// If a non-nil error is provided, subsequent NotifyRegistrationStatus calls will return this error. +// Passing nil as the err argument will clear any previously set error, effectively disabling erroring. +func (e *registrationServer) setNotifyRegistrationStatusError(err error) { + if err == nil { + e.notifyRegistrationStatusError.Store(nil) + return + } + e.notifyRegistrationStatusError.Store(&err) +} diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 59eb71397ab..6f2aa372767 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -594,6 +594,17 @@ func (ex *ExamplePlugin) SetGetInfoError(err error) { ex.d.SetGetInfoError(err) } +// SetNotifyRegistrationStatusError sets an error to be returned by the +// plugin's NotifyRegistrationStatus call. +// This can be used in tests to simulate a registration failure scenario, +// allowing verification that the kubelet plugin manager retries registration +// when NotifyRegistrationStatus fails. +// +// To restore normal NotifyRegistrationStatus behavior, call SetNotifyRegistrationStatusError(nil). +func (ex *ExamplePlugin) SetNotifyRegistrationStatusError(err error) { + ex.d.SetNotifyRegistrationStatusError(err) +} + func (ex *ExamplePlugin) NodeWatchResources(req *drahealthv1alpha1.NodeWatchResourcesRequest, srv drahealthv1alpha1.DRAResourceHealth_NodeWatchResourcesServer) error { logger := klog.FromContext(srv.Context()) logger.V(3).Info("Starting dynamic NodeWatchResources stream") diff --git a/test/e2e/dra/test-driver/gomega/matchers.go b/test/e2e/dra/test-driver/gomega/matchers.go index c85a456f686..ca76248025a 100644 --- a/test/e2e/dra/test-driver/gomega/matchers.go +++ b/test/e2e/dra/test-driver/gomega/matchers.go @@ -45,6 +45,17 @@ func GetInfoFailed() gcustom.CustomGomegaMatcher { }).WithMessage("contain unsuccessful GetInfo call") } +func NotifyRegistrationStatusFailed() gcustom.CustomGomegaMatcher { + return gcustom.MakeMatcher(func(actualCalls []testdriver.GRPCCall) (bool, error) { + for _, call := range actualCalls { + if call.FullMethod == "/pluginregistration.Registration/NotifyRegistrationStatus" && call.Err != nil { + return true, nil + } + } + return false, nil + }).WithMessage("contain unsuccessful NotifyRegistrationStatus call") +} + // NodePrepareResoucesSucceeded checks that NodePrepareResources API has been called and succeeded var NodePrepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []testdriver.GRPCCall) (bool, error) { for _, call := range actualCalls { diff --git a/test/e2e_node/dra_test.go b/test/e2e_node/dra_test.go index 183bcf50987..6938a9138ff 100644 --- a/test/e2e_node/dra_test.go +++ b/test/e2e_node/dra_test.go @@ -144,17 +144,40 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami ginkgo.By("restart Kubelet") restartKubelet(ctx, true) - ginkgo.By("wait for Registration call to fail") + ginkgo.By("wait for GetInfo call to fail") gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.GetInfoFailed()) gomega.Expect(kubeletPlugin.GetGRPCCalls()).ShouldNot(testdrivergomega.BeRegistered, "Expect plugin not to be registered due to GetInfo failure") - ginkgo.By("unset registration failure mode") + ginkgo.By("unset GetInfo failure mode") kubeletPlugin.SetGetInfoError(nil) ginkgo.By("wait for Kubelet plugin re-registration") gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) }) + // Test that the kubelet plugin manager retries plugin registration + // when the NotifyRegistrationStatus call fails, and succeeds once the call passes. + ginkgo.It("must recover and register after NotifyRegistrationStatus failure", func(ctx context.Context) { + kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName) + + ginkgo.By("set NotifyRegistrationStatus failure mode") + kubeletPlugin.SetNotifyRegistrationStatusError(fmt.Errorf("simulated NotifyRegistrationStatus failure")) + kubeletPlugin.ResetGRPCCalls() + + ginkgo.By("restart Kubelet") + restartKubelet(ctx, true) + + ginkgo.By("wait for NotifyRegistrationStatus call to fail") + gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.NotifyRegistrationStatusFailed()) + gomega.Expect(kubeletPlugin.GetGRPCCalls()).ShouldNot(testdrivergomega.BeRegistered, "Expect plugin not to be registered due to NotifyRegistrationStatus failure") + + ginkgo.By("unset NotifyRegistrationStatus failure mode") + kubeletPlugin.SetNotifyRegistrationStatusError(nil) + + ginkgo.By("wait for Kubelet plugin re-registration") + gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) + }) + ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) { newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName)