diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 745ca231547..725197323dd 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -130,11 +130,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( logger.Error(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) } if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil { + actualStateOfWorldUpdater.RemovePlugin(socketPath) return og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) } // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate if err := og.notifyPlugin(ctx, client, true, ""); err != nil { + actualStateOfWorldUpdater.RemovePlugin(socketPath) + handler.DeRegisterPlugin(infoResp.Name, infoResp.Endpoint) return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err) } return nil diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index a8e03521635..e5c2304d6bd 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -811,6 +811,13 @@ func (d *Helper) SetGetInfoError(err error) { d.registrar.setGetInfoError(err) } +// SetNotifyRegistrationStatusError configures the registration server +// to make NotifyRegistrationStatus calls return the specified error. +// To restore normal behavior, call SetNotifyRegistrationStatusError(nil). +func (d *Helper) SetNotifyRegistrationStatusError(err error) { + d.registrar.setNotifyRegistrationStatusError(err) +} + // serializeGRPCIfEnabled locks a mutex if serialization is enabled. // Either way it returns a method that the caller must invoke // via defer. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go index e766ad4e467..8c700b87fd8 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/registrationserver.go @@ -31,7 +31,8 @@ type registrationServer struct { supportedVersions []string status *registerapi.RegistrationStatus - getInfoError atomic.Pointer[error] + getInfoError atomic.Pointer[error] + notifyRegistrationStatusError atomic.Pointer[error] registerapi.UnsafeRegistrationServer } @@ -53,6 +54,9 @@ func (e *registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoR // NotifyRegistrationStatus is the RPC invoked by plugin watcher. func (e *registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { + if err := e.getNotifyRegistrationStatusError(); err != nil { + return nil, err + } e.status = status if !status.PluginRegistered { return nil, fmt.Errorf("failed registration process: %+v", status.Error) @@ -69,6 +73,14 @@ func (e *registrationServer) getGetInfoError() error { return *errPtr } +func (e *registrationServer) getNotifyRegistrationStatusError() error { + errPtr := e.notifyRegistrationStatusError.Load() + if errPtr == nil { + return nil + } + return *errPtr +} + // setGetInfoError sets the error to be returned by the GetInfo handler of the registration server. // If a non-nil error is provided, subsequent GetInfo calls will return this error. // Passing nil as the err argument will clear any previously set error, effectively disabling erroring. @@ -79,3 +91,15 @@ func (e *registrationServer) setGetInfoError(err error) { } e.getInfoError.Store(&err) } + +// setNotifyRegistrationStatusError sets the error to be returned by the NotifyRegistrationStatus handler +// of the registration server. +// If a non-nil error is provided, subsequent NotifyRegistrationStatus calls will return this error. +// Passing nil as the err argument will clear any previously set error, effectively disabling erroring. +func (e *registrationServer) setNotifyRegistrationStatusError(err error) { + if err == nil { + e.notifyRegistrationStatusError.Store(nil) + return + } + e.notifyRegistrationStatusError.Store(&err) +} diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 59eb71397ab..6f2aa372767 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -594,6 +594,17 @@ func (ex *ExamplePlugin) SetGetInfoError(err error) { ex.d.SetGetInfoError(err) } +// SetNotifyRegistrationStatusError sets an error to be returned by the +// plugin's NotifyRegistrationStatus call. +// This can be used in tests to simulate a registration failure scenario, +// allowing verification that the kubelet plugin manager retries registration +// when NotifyRegistrationStatus fails. +// +// To restore normal NotifyRegistrationStatus behavior, call SetNotifyRegistrationStatusError(nil). +func (ex *ExamplePlugin) SetNotifyRegistrationStatusError(err error) { + ex.d.SetNotifyRegistrationStatusError(err) +} + func (ex *ExamplePlugin) NodeWatchResources(req *drahealthv1alpha1.NodeWatchResourcesRequest, srv drahealthv1alpha1.DRAResourceHealth_NodeWatchResourcesServer) error { logger := klog.FromContext(srv.Context()) logger.V(3).Info("Starting dynamic NodeWatchResources stream") diff --git a/test/e2e/dra/test-driver/gomega/matchers.go b/test/e2e/dra/test-driver/gomega/matchers.go index c85a456f686..ca76248025a 100644 --- a/test/e2e/dra/test-driver/gomega/matchers.go +++ b/test/e2e/dra/test-driver/gomega/matchers.go @@ -45,6 +45,17 @@ func GetInfoFailed() gcustom.CustomGomegaMatcher { }).WithMessage("contain unsuccessful GetInfo call") } +func NotifyRegistrationStatusFailed() gcustom.CustomGomegaMatcher { + return gcustom.MakeMatcher(func(actualCalls []testdriver.GRPCCall) (bool, error) { + for _, call := range actualCalls { + if call.FullMethod == "/pluginregistration.Registration/NotifyRegistrationStatus" && call.Err != nil { + return true, nil + } + } + return false, nil + }).WithMessage("contain unsuccessful NotifyRegistrationStatus call") +} + // NodePrepareResoucesSucceeded checks that NodePrepareResources API has been called and succeeded var NodePrepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []testdriver.GRPCCall) (bool, error) { for _, call := range actualCalls { diff --git a/test/e2e_node/dra_test.go b/test/e2e_node/dra_test.go index 183bcf50987..6938a9138ff 100644 --- a/test/e2e_node/dra_test.go +++ b/test/e2e_node/dra_test.go @@ -144,17 +144,40 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami ginkgo.By("restart Kubelet") restartKubelet(ctx, true) - ginkgo.By("wait for Registration call to fail") + ginkgo.By("wait for GetInfo call to fail") gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.GetInfoFailed()) gomega.Expect(kubeletPlugin.GetGRPCCalls()).ShouldNot(testdrivergomega.BeRegistered, "Expect plugin not to be registered due to GetInfo failure") - ginkgo.By("unset registration failure mode") + ginkgo.By("unset GetInfo failure mode") kubeletPlugin.SetGetInfoError(nil) ginkgo.By("wait for Kubelet plugin re-registration") gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) }) + // Test that the kubelet plugin manager retries plugin registration + // when the NotifyRegistrationStatus call fails, and succeeds once the call passes. + ginkgo.It("must recover and register after NotifyRegistrationStatus failure", func(ctx context.Context) { + kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName) + + ginkgo.By("set NotifyRegistrationStatus failure mode") + kubeletPlugin.SetNotifyRegistrationStatusError(fmt.Errorf("simulated NotifyRegistrationStatus failure")) + kubeletPlugin.ResetGRPCCalls() + + ginkgo.By("restart Kubelet") + restartKubelet(ctx, true) + + ginkgo.By("wait for NotifyRegistrationStatus call to fail") + gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.NotifyRegistrationStatusFailed()) + gomega.Expect(kubeletPlugin.GetGRPCCalls()).ShouldNot(testdrivergomega.BeRegistered, "Expect plugin not to be registered due to NotifyRegistrationStatus failure") + + ginkgo.By("unset NotifyRegistrationStatus failure mode") + kubeletPlugin.SetNotifyRegistrationStatusError(nil) + + ginkgo.By("wait for Kubelet plugin re-registration") + gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) + }) + ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) { newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName)