pluginmanager: fix handling registration failures

Updated the pluginmanager to handle registration failures better
by removing the plugin from the actual state of the world and
deregistering when registration or notification fails.
Added a new e2e test to verify that the kubelet plugin manager retries
plugin registration when NotifyRegistrationStatus call fails.
This commit is contained in:
Ed Bartosh 2025-07-31 13:27:35 +03:00
parent 5e2ad84f67
commit 3156c2a2ef
6 changed files with 82 additions and 3 deletions

View file

@ -130,11 +130,14 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
logger.Error(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
}
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil {
actualStateOfWorldUpdater.RemovePlugin(socketPath)
return og.notifyPlugin(ctx, client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
if err := og.notifyPlugin(ctx, client, true, ""); err != nil {
actualStateOfWorldUpdater.RemovePlugin(socketPath)
handler.DeRegisterPlugin(infoResp.Name, infoResp.Endpoint)
return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
}
return nil

View file

@ -811,6 +811,13 @@ func (d *Helper) SetGetInfoError(err error) {
d.registrar.setGetInfoError(err)
}
// SetNotifyRegistrationStatusError configures the registration server
// to make NotifyRegistrationStatus calls return the specified error.
// To restore normal behavior, call SetNotifyRegistrationStatusError(nil).
func (d *Helper) SetNotifyRegistrationStatusError(err error) {
d.registrar.setNotifyRegistrationStatusError(err)
}
// serializeGRPCIfEnabled locks a mutex if serialization is enabled.
// Either way it returns a method that the caller must invoke
// via defer.

View file

@ -31,7 +31,8 @@ type registrationServer struct {
supportedVersions []string
status *registerapi.RegistrationStatus
getInfoError atomic.Pointer[error]
getInfoError atomic.Pointer[error]
notifyRegistrationStatusError atomic.Pointer[error]
registerapi.UnsafeRegistrationServer
}
@ -53,6 +54,9 @@ func (e *registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoR
// NotifyRegistrationStatus is the RPC invoked by plugin watcher.
func (e *registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
if err := e.getNotifyRegistrationStatusError(); err != nil {
return nil, err
}
e.status = status
if !status.PluginRegistered {
return nil, fmt.Errorf("failed registration process: %+v", status.Error)
@ -69,6 +73,14 @@ func (e *registrationServer) getGetInfoError() error {
return *errPtr
}
func (e *registrationServer) getNotifyRegistrationStatusError() error {
errPtr := e.notifyRegistrationStatusError.Load()
if errPtr == nil {
return nil
}
return *errPtr
}
// setGetInfoError sets the error to be returned by the GetInfo handler of the registration server.
// If a non-nil error is provided, subsequent GetInfo calls will return this error.
// Passing nil as the err argument will clear any previously set error, effectively disabling erroring.
@ -79,3 +91,15 @@ func (e *registrationServer) setGetInfoError(err error) {
}
e.getInfoError.Store(&err)
}
// setNotifyRegistrationStatusError sets the error to be returned by the NotifyRegistrationStatus handler
// of the registration server.
// If a non-nil error is provided, subsequent NotifyRegistrationStatus calls will return this error.
// Passing nil as the err argument will clear any previously set error, effectively disabling erroring.
func (e *registrationServer) setNotifyRegistrationStatusError(err error) {
if err == nil {
e.notifyRegistrationStatusError.Store(nil)
return
}
e.notifyRegistrationStatusError.Store(&err)
}

View file

@ -594,6 +594,17 @@ func (ex *ExamplePlugin) SetGetInfoError(err error) {
ex.d.SetGetInfoError(err)
}
// SetNotifyRegistrationStatusError sets an error to be returned by the
// plugin's NotifyRegistrationStatus call.
// This can be used in tests to simulate a registration failure scenario,
// allowing verification that the kubelet plugin manager retries registration
// when NotifyRegistrationStatus fails.
//
// To restore normal NotifyRegistrationStatus behavior, call SetNotifyRegistrationStatusError(nil).
func (ex *ExamplePlugin) SetNotifyRegistrationStatusError(err error) {
ex.d.SetNotifyRegistrationStatusError(err)
}
func (ex *ExamplePlugin) NodeWatchResources(req *drahealthv1alpha1.NodeWatchResourcesRequest, srv drahealthv1alpha1.DRAResourceHealth_NodeWatchResourcesServer) error {
logger := klog.FromContext(srv.Context())
logger.V(3).Info("Starting dynamic NodeWatchResources stream")

View file

@ -45,6 +45,17 @@ func GetInfoFailed() gcustom.CustomGomegaMatcher {
}).WithMessage("contain unsuccessful GetInfo call")
}
func NotifyRegistrationStatusFailed() gcustom.CustomGomegaMatcher {
return gcustom.MakeMatcher(func(actualCalls []testdriver.GRPCCall) (bool, error) {
for _, call := range actualCalls {
if call.FullMethod == "/pluginregistration.Registration/NotifyRegistrationStatus" && call.Err != nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain unsuccessful NotifyRegistrationStatus call")
}
// NodePrepareResoucesSucceeded checks that NodePrepareResources API has been called and succeeded
var NodePrepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []testdriver.GRPCCall) (bool, error) {
for _, call := range actualCalls {

View file

@ -144,17 +144,40 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
ginkgo.By("restart Kubelet")
restartKubelet(ctx, true)
ginkgo.By("wait for Registration call to fail")
ginkgo.By("wait for GetInfo call to fail")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.GetInfoFailed())
gomega.Expect(kubeletPlugin.GetGRPCCalls()).ShouldNot(testdrivergomega.BeRegistered, "Expect plugin not to be registered due to GetInfo failure")
ginkgo.By("unset registration failure mode")
ginkgo.By("unset GetInfo failure mode")
kubeletPlugin.SetGetInfoError(nil)
ginkgo.By("wait for Kubelet plugin re-registration")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
})
// Test that the kubelet plugin manager retries plugin registration
// when the NotifyRegistrationStatus call fails, and succeeds once the call passes.
ginkgo.It("must recover and register after NotifyRegistrationStatus failure", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName)
ginkgo.By("set NotifyRegistrationStatus failure mode")
kubeletPlugin.SetNotifyRegistrationStatusError(fmt.Errorf("simulated NotifyRegistrationStatus failure"))
kubeletPlugin.ResetGRPCCalls()
ginkgo.By("restart Kubelet")
restartKubelet(ctx, true)
ginkgo.By("wait for NotifyRegistrationStatus call to fail")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(retryTestTimeout).Should(testdrivergomega.NotifyRegistrationStatusFailed())
gomega.Expect(kubeletPlugin.GetGRPCCalls()).ShouldNot(testdrivergomega.BeRegistered, "Expect plugin not to be registered due to NotifyRegistrationStatus failure")
ginkgo.By("unset NotifyRegistrationStatus failure mode")
kubeletPlugin.SetNotifyRegistrationStatusError(nil)
ginkgo.By("wait for Kubelet plugin re-registration")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
})
ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
newKubeletPlugin(ctx, f.ClientSet, f.Namespace.Name, getNodeName(ctx, f), driverName)