From ad6c155449e38d7cca22230fdb99ae02b65ccb59 Mon Sep 17 00:00:00 2001 From: Carlos Eduardo Arango Gutierrez Date: Mon, 10 Nov 2025 15:50:09 -0500 Subject: [PATCH] KEP-4680: Add message field support to DRA device health reporting Author: Carlos Eduardo Arango Gutierrez Co-Authored-By: Harshal Patil <12152047+harche@users.noreply.github.com> Signed-off-by: Harshal Patil <12152047+harche@users.noreply.github.com> --- api/openapi-spec/swagger.json | 4 + api/openapi-spec/v3/api__v1_openapi.json | 4 + pkg/api/pod/util.go | 75 ++- pkg/api/pod/util_test.go | 630 ++++++++++++++++++ pkg/apis/core/types.go | 3 + pkg/apis/core/v1/zz_generated.conversion.go | 2 + pkg/apis/core/validation/validation.go | 8 + pkg/apis/core/validation/validation_test.go | 97 +++ pkg/apis/core/zz_generated.deepcopy.go | 9 +- pkg/features/kube_features.go | 13 + pkg/generated/openapi/zz_generated.openapi.go | 7 + pkg/kubelet/cm/dra/healthinfo.go | 19 +- pkg/kubelet/cm/dra/healthinfo_test.go | 24 +- pkg/kubelet/cm/dra/manager.go | 67 +- pkg/kubelet/cm/dra/manager_test.go | 217 +++++- .../cm/dra/plugin/dra_plugin_manager.go | 2 +- pkg/kubelet/cm/dra/state/state.go | 4 + .../src/k8s.io/api/core/v1/generated.pb.go | 45 ++ .../src/k8s.io/api/core/v1/generated.proto | 6 + staging/src/k8s.io/api/core/v1/types.go | 9 + .../core/v1/types_swagger_doc_generated.go | 1 + .../api/core/v1/zz_generated.deepcopy.go | 9 +- .../k8s.io/api/testdata/HEAD/core.v1.Pod.json | 9 +- .../k8s.io/api/testdata/HEAD/core.v1.Pod.pb | Bin 12971 -> 13013 bytes .../k8s.io/api/testdata/HEAD/core.v1.Pod.yaml | 3 + .../HEAD/core.v1.PodStatusResult.json | 9 +- .../testdata/HEAD/core.v1.PodStatusResult.pb | Bin 2446 -> 2488 bytes .../HEAD/core.v1.PodStatusResult.yaml | 3 + .../core/v1/resourcehealth.go | 11 + .../applyconfigurations/internal/internal.go | 3 + .../pkg/apis/dra-health/v1alpha1/api.pb.go | 61 +- .../pkg/apis/dra-health/v1alpha1/api.proto | 5 + .../reference/feature_list.md | 3 +- .../reference/versioned_feature_list.yaml | 10 + test/e2e/dra/dra.go | 187 ++++++ test/e2e/dra/test-driver/app/kubeletplugin.go | 20 +- test/e2e/dra/utils/deploy.go | 1 + 37 files changed, 1507 insertions(+), 73 deletions(-) diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 081a2320321..9b4c9c6c40f 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -11283,6 +11283,10 @@ "description": "Health of the resource. can be one of:\n - Healthy: operates as normal\n - Unhealthy: reported unhealthy. We consider this a temporary health issue\n since we do not have a mechanism today to distinguish\n temporary and permanent issues.\n - Unknown: The status cannot be determined.\n For example, Device Plugin got unregistered and hasn't been re-registered since.\n\nIn future we may want to introduce the PermanentlyUnhealthy Status.", "type": "string" }, + "message": { + "description": "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.", + "type": "string" + }, "resourceID": { "description": "ResourceID is the unique identifier of the resource. See the ResourceID type for more information.", "type": "string" diff --git a/api/openapi-spec/v3/api__v1_openapi.json b/api/openapi-spec/v3/api__v1_openapi.json index 532ccd7a523..95c5398f742 100644 --- a/api/openapi-spec/v3/api__v1_openapi.json +++ b/api/openapi-spec/v3/api__v1_openapi.json @@ -6941,6 +6941,10 @@ "description": "Health of the resource. can be one of:\n - Healthy: operates as normal\n - Unhealthy: reported unhealthy. We consider this a temporary health issue\n since we do not have a mechanism today to distinguish\n temporary and permanent issues.\n - Unknown: The status cannot be determined.\n For example, Device Plugin got unregistered and hasn't been re-registered since.\n\nIn future we may want to introduce the PermanentlyUnhealthy Status.", "type": "string" }, + "message": { + "description": "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.", + "type": "string" + }, "resourceID": { "default": "", "description": "ResourceID is the unique identifier of the resource. See the ResourceID type for more information.", diff --git a/pkg/api/pod/util.go b/pkg/api/pod/util.go index 3f4af333621..9f20394f051 100644 --- a/pkg/api/pod/util.go +++ b/pkg/api/pod/util.go @@ -1056,7 +1056,7 @@ func dropDisabledPodStatusFields(podStatus, oldPodStatus *api.PodStatus, podSpec } } - if !utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { + if !utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) && !resourceHealthStatusInUse(oldPodStatus) { setAllocatedResourcesStatusToNil := func(csl []api.ContainerStatus) { for i := range csl { csl[i].AllocatedResourcesStatus = nil @@ -1067,6 +1067,21 @@ func dropDisabledPodStatusFields(podStatus, oldPodStatus *api.PodStatus, podSpec setAllocatedResourcesStatusToNil(podStatus.EphemeralContainerStatuses) } + if !utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatusMessage) && !resourceHealthStatusMessageInUse(oldPodStatus) { + dropMessageField := func(csl []api.ContainerStatus) { + for i := range csl { + for j := range csl[i].AllocatedResourcesStatus { + for k := range csl[i].AllocatedResourcesStatus[j].Resources { + csl[i].AllocatedResourcesStatus[j].Resources[k].Message = nil + } + } + } + } + dropMessageField(podStatus.ContainerStatuses) + dropMessageField(podStatus.InitContainerStatuses) + dropMessageField(podStatus.EphemeralContainerStatuses) + } + // drop ContainerStatus.User field to empty (disable SupplementalGroupsPolicy) if !utilfeature.DefaultFeatureGate.Enabled(features.SupplementalGroupsPolicy) && !supplementalGroupsPolicyInUse(oldPodSpec) { dropUserField := func(csl []api.ContainerStatus) { @@ -1111,6 +1126,64 @@ func draExendedResourceInUse(podStatus *api.PodStatus) bool { return false } +func resourceHealthStatusInUse(podStatus *api.PodStatus) bool { + if podStatus == nil { + return false + } + + checkContainerStatuses := func(csl []api.ContainerStatus) bool { + for _, cs := range csl { + if len(cs.AllocatedResourcesStatus) > 0 { + return true + } + } + return false + } + + if checkContainerStatuses(podStatus.ContainerStatuses) { + return true + } + if checkContainerStatuses(podStatus.InitContainerStatuses) { + return true + } + if checkContainerStatuses(podStatus.EphemeralContainerStatuses) { + return true + } + + return false +} + +func resourceHealthStatusMessageInUse(podStatus *api.PodStatus) bool { + if podStatus == nil { + return false + } + + checkContainerStatuses := func(csl []api.ContainerStatus) bool { + for _, cs := range csl { + for _, rs := range cs.AllocatedResourcesStatus { + for _, rh := range rs.Resources { + if rh.Message != nil { + return true + } + } + } + } + return false + } + + if checkContainerStatuses(podStatus.ContainerStatuses) { + return true + } + if checkContainerStatuses(podStatus.InitContainerStatuses) { + return true + } + if checkContainerStatuses(podStatus.EphemeralContainerStatuses) { + return true + } + + return false +} + func dynamicResourceAllocationInUse(podSpec *api.PodSpec) bool { // We only need to check this field because the containers cannot have // resource requirements entries for claims without a corresponding diff --git a/pkg/api/pod/util_test.go b/pkg/api/pod/util_test.go index 3701535c049..de65f06d09b 100644 --- a/pkg/api/pod/util_test.go +++ b/pkg/api/pod/util_test.go @@ -6734,3 +6734,633 @@ func TestDropDisabledPodStatusFields_InPlacePodLevelResourcesVerticalScaling(t * }) } } + +func TestResourceHealthStatusInUse(t *testing.T) { + testCases := []struct { + name string + podStatus *api.PodStatus + expected bool + }{ + { + name: "nil pod status", + podStatus: nil, + expected: false, + }, + { + name: "empty pod status", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{}, + }, + expected: false, + }, + { + name: "pod status with AllocatedResourcesStatus in container", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "test-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "pod status with AllocatedResourcesStatus in init container", + podStatus: &api.PodStatus{ + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "pod status with AllocatedResourcesStatus in ephemeral container", + podStatus: &api.PodStatus{ + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "pod status without AllocatedResourcesStatus", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "test-container", + }, + }, + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + }, + }, + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + }, + }, + }, + expected: false, + }, + { + name: "pod status with empty AllocatedResourcesStatus array", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "test-container", + AllocatedResourcesStatus: []api.ResourceStatus{}, + }, + }, + }, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := resourceHealthStatusInUse(tc.podStatus) + if result != tc.expected { + t.Errorf("resourceHealthStatusInUse() = %v, want %v", result, tc.expected) + } + }) + } +} + +func TestDropDisabledPodStatusFields_ResourceHealthStatus(t *testing.T) { + podStatusWithResourceHealth := func() *api.PodStatus { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "container1", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + }, + }, + }, + }, + }, + }, + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/gpu", + Resources: []api.ResourceHealth{ + { + ResourceID: "gpu-1", + Health: api.ResourceHealthStatusUnhealthy, + }, + }, + }, + }, + }, + }, + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/nic", + Resources: []api.ResourceHealth{ + { + ResourceID: "nic-1", + Health: api.ResourceHealthStatusHealthy, + }, + }, + }, + }, + }, + }, + } + } + + podStatusWithoutResourceHealth := func() *api.PodStatus { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "container1", + }, + }, + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + }, + }, + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + }, + }, + } + } + + podStatusNilResourceHealth := func() *api.PodStatus { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "container1", + AllocatedResourcesStatus: nil, + }, + }, + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + AllocatedResourcesStatus: nil, + }, + }, + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + AllocatedResourcesStatus: nil, + }, + }, + } + } + + tests := []struct { + name string + enabled bool + podStatus *api.PodStatus + oldPodStatus *api.PodStatus + wantPodStatus *api.PodStatus + }{ + { + name: "feature enabled, old=without, new=without", + enabled: true, + oldPodStatus: podStatusWithoutResourceHealth(), + podStatus: podStatusWithoutResourceHealth(), + wantPodStatus: podStatusWithoutResourceHealth(), + }, + { + name: "feature enabled, old=with, new=with", + enabled: true, + oldPodStatus: podStatusWithResourceHealth(), + podStatus: podStatusWithResourceHealth(), + wantPodStatus: podStatusWithResourceHealth(), + }, + { + name: "feature enabled, old=without, new=with", + enabled: true, + oldPodStatus: podStatusWithoutResourceHealth(), + podStatus: podStatusWithResourceHealth(), + wantPodStatus: podStatusWithResourceHealth(), + }, + { + name: "feature disabled, old=without, new=without", + enabled: false, + oldPodStatus: podStatusWithoutResourceHealth(), + podStatus: podStatusWithoutResourceHealth(), + wantPodStatus: podStatusNilResourceHealth(), + }, + { + name: "feature disabled, old=without, new=with (should drop)", + enabled: false, + oldPodStatus: podStatusWithoutResourceHealth(), + podStatus: podStatusWithResourceHealth(), + wantPodStatus: podStatusNilResourceHealth(), + }, + { + name: "feature disabled, old=with, new=with (should preserve - bug fix)", + enabled: false, + oldPodStatus: podStatusWithResourceHealth(), + podStatus: podStatusWithResourceHealth(), + wantPodStatus: podStatusWithResourceHealth(), + }, + { + name: "feature disabled, old=with, new=without (should preserve nil)", + enabled: false, + oldPodStatus: podStatusWithResourceHealth(), + podStatus: podStatusWithoutResourceHealth(), + wantPodStatus: podStatusWithoutResourceHealth(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, tt.enabled) + dropDisabledPodStatusFields(tt.podStatus, tt.oldPodStatus, &api.PodSpec{}, &api.PodSpec{}) + if !reflect.DeepEqual(tt.podStatus, tt.wantPodStatus) { + t.Errorf("dropDisabledPodStatusFields() = %v, want %v\ndiff: %v", + tt.podStatus, tt.wantPodStatus, cmp.Diff(tt.wantPodStatus, tt.podStatus)) + } + }) + } +} + +func TestResourceHealthStatusMessageInUse(t *testing.T) { + message := "test message" + testCases := []struct { + name string + podStatus *api.PodStatus + expected bool + }{ + { + name: "nil pod status", + podStatus: nil, + expected: false, + }, + { + name: "empty pod status", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{}, + }, + expected: false, + }, + { + name: "pod status with message in container", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "test-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + Message: &message, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "pod status with message in init container", + podStatus: &api.PodStatus{ + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + Message: &message, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "pod status with message in ephemeral container", + podStatus: &api.PodStatus{ + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + Message: &message, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "pod status without message (nil pointer)", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "test-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + Message: nil, + }, + }, + }, + }, + }, + }, + }, + expected: false, + }, + { + name: "pod status with AllocatedResourcesStatus but no message", + podStatus: &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "test-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + }, + }, + }, + }, + }, + }, + }, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := resourceHealthStatusMessageInUse(tc.podStatus) + if result != tc.expected { + t.Errorf("resourceHealthStatusMessageInUse() = %v, want %v", result, tc.expected) + } + }) + } +} + +func TestDropDisabledPodStatusFields_ResourceHealthStatusMessage(t *testing.T) { + message1 := "ECC error detected" + message2 := "GPU temperature high" + message3 := "NIC link down" + + podStatusWithMessage := func() *api.PodStatus { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "container1", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + Message: &message1, + }, + }, + }, + }, + }, + }, + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/gpu", + Resources: []api.ResourceHealth{ + { + ResourceID: "gpu-1", + Health: api.ResourceHealthStatusUnhealthy, + Message: &message2, + }, + }, + }, + }, + }, + }, + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/nic", + Resources: []api.ResourceHealth{ + { + ResourceID: "nic-1", + Health: api.ResourceHealthStatusHealthy, + Message: &message3, + }, + }, + }, + }, + }, + }, + } + } + + podStatusWithoutMessage := func() *api.PodStatus { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "container1", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/device", + Resources: []api.ResourceHealth{ + { + ResourceID: "device-1", + Health: api.ResourceHealthStatusHealthy, + Message: nil, + }, + }, + }, + }, + }, + }, + InitContainerStatuses: []api.ContainerStatus{ + { + Name: "init-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/gpu", + Resources: []api.ResourceHealth{ + { + ResourceID: "gpu-1", + Health: api.ResourceHealthStatusUnhealthy, + Message: nil, + }, + }, + }, + }, + }, + }, + EphemeralContainerStatuses: []api.ContainerStatus{ + { + Name: "ephemeral-container", + AllocatedResourcesStatus: []api.ResourceStatus{ + { + Name: "example.com/nic", + Resources: []api.ResourceHealth{ + { + ResourceID: "nic-1", + Health: api.ResourceHealthStatusHealthy, + Message: nil, + }, + }, + }, + }, + }, + }, + } + } + + tests := []struct { + name string + enabled bool + podStatus *api.PodStatus + oldPodStatus *api.PodStatus + wantPodStatus *api.PodStatus + }{ + { + name: "feature enabled, old=without, new=without", + enabled: true, + oldPodStatus: podStatusWithoutMessage(), + podStatus: podStatusWithoutMessage(), + wantPodStatus: podStatusWithoutMessage(), + }, + { + name: "feature enabled, old=with, new=with", + enabled: true, + oldPodStatus: podStatusWithMessage(), + podStatus: podStatusWithMessage(), + wantPodStatus: podStatusWithMessage(), + }, + { + name: "feature enabled, old=without, new=with", + enabled: true, + oldPodStatus: podStatusWithoutMessage(), + podStatus: podStatusWithMessage(), + wantPodStatus: podStatusWithMessage(), + }, + { + name: "feature disabled, old=without, new=without", + enabled: false, + oldPodStatus: podStatusWithoutMessage(), + podStatus: podStatusWithoutMessage(), + wantPodStatus: podStatusWithoutMessage(), + }, + { + name: "feature disabled, old=without, new=with (should drop)", + enabled: false, + oldPodStatus: podStatusWithoutMessage(), + podStatus: podStatusWithMessage(), + wantPodStatus: podStatusWithoutMessage(), + }, + { + name: "feature disabled, old=with, new=with (should preserve - bug fix)", + enabled: false, + oldPodStatus: podStatusWithMessage(), + podStatus: podStatusWithMessage(), + wantPodStatus: podStatusWithMessage(), + }, + { + name: "feature disabled, old=with, new=without (should preserve nil)", + enabled: false, + oldPodStatus: podStatusWithMessage(), + podStatus: podStatusWithoutMessage(), + wantPodStatus: podStatusWithoutMessage(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Enable ResourceHealthStatus as well since ResourceHealthStatusMessage depends on it + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatusMessage, tt.enabled) + dropDisabledPodStatusFields(tt.podStatus, tt.oldPodStatus, &api.PodSpec{}, &api.PodSpec{}) + if !reflect.DeepEqual(tt.podStatus, tt.wantPodStatus) { + t.Errorf("dropDisabledPodStatusFields() = %v, want %v\ndiff: %v", + tt.podStatus, tt.wantPodStatus, cmp.Diff(tt.wantPodStatus, tt.podStatus)) + } + }) + } +} diff --git a/pkg/apis/core/types.go b/pkg/apis/core/types.go index d5fb3eae978..7f66cf35cd8 100644 --- a/pkg/apis/core/types.go +++ b/pkg/apis/core/types.go @@ -3075,6 +3075,9 @@ type ResourceHealth struct { // // In future we may want to introduce the PermanentlyUnhealthy Status. Health ResourceHealthStatus + // Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold"). + // This field is populated by the kubelet when ResourceHealthStatusMessage is enabled. + Message *string } // ContainerUser represents user identity information diff --git a/pkg/apis/core/v1/zz_generated.conversion.go b/pkg/apis/core/v1/zz_generated.conversion.go index ffda2683512..b43561c7dae 100644 --- a/pkg/apis/core/v1/zz_generated.conversion.go +++ b/pkg/apis/core/v1/zz_generated.conversion.go @@ -7838,6 +7838,7 @@ func Convert_core_ResourceFieldSelector_To_v1_ResourceFieldSelector(in *core.Res func autoConvert_v1_ResourceHealth_To_core_ResourceHealth(in *corev1.ResourceHealth, out *core.ResourceHealth, s conversion.Scope) error { out.ResourceID = core.ResourceID(in.ResourceID) out.Health = core.ResourceHealthStatus(in.Health) + out.Message = (*string)(unsafe.Pointer(in.Message)) return nil } @@ -7849,6 +7850,7 @@ func Convert_v1_ResourceHealth_To_core_ResourceHealth(in *corev1.ResourceHealth, func autoConvert_core_ResourceHealth_To_v1_ResourceHealth(in *core.ResourceHealth, out *corev1.ResourceHealth, s conversion.Scope) error { out.ResourceID = corev1.ResourceID(in.ResourceID) out.Health = corev1.ResourceHealthStatus(in.Health) + out.Message = (*string)(unsafe.Pointer(in.Message)) return nil } diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index cc08203cb66..083e488e01a 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -9524,6 +9524,14 @@ func validateContainerStatusAllocatedResourcesStatus(containerStatuses []core.Co allErrors = append(allErrors, field.NotSupported(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("health"), r.Health, sets.List(supportedResourceHealthValues))) } + if r.Message != nil { + if len(*r.Message) == 0 { + allErrors = append(allErrors, field.Required(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("message"), "must be non-empty if specified")) + } else if len(*r.Message) > v1.ResourceHealthMessageMaxLength { + allErrors = append(allErrors, field.TooLong(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("message"), *r.Message, v1.ResourceHealthMessageMaxLength)) + } + } + if uniqueResources.Has(r.ResourceID) { allErrors = append(allErrors, field.Duplicate(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("resourceID"), r.ResourceID)) } else { diff --git a/pkg/apis/core/validation/validation_test.go b/pkg/apis/core/validation/validation_test.go index bfbae981276..fb73b6f37e1 100644 --- a/pkg/apis/core/validation/validation_test.go +++ b/pkg/apis/core/validation/validation_test.go @@ -27976,6 +27976,103 @@ func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) { field.NotSupported(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("health"), core.ResourceHealthStatus("invalid-health-value"), []string{"Healthy", "Unhealthy", "Unknown"}), }, }, + + "don't allow message longer than max length": { + containers: []core.Container{ + { + Name: "container-1", + Resources: core.ResourceRequirements{ + Requests: core.ResourceList{ + "test.device/test": resource.MustParse("1"), + }, + }, + }, + }, + containerStatuses: []core.ContainerStatus{ + { + Name: "container-1", + AllocatedResourcesStatus: []core.ResourceStatus{ + { + Name: "test.device/test", + Resources: []core.ResourceHealth{ + { + ResourceID: "resource-1", + Health: core.ResourceHealthStatusHealthy, + Message: ptr.To(string(make([]byte, v1.ResourceHealthMessageMaxLength+1))), + }, + }, + }, + }, + }, + }, + wantFieldErrors: field.ErrorList{ + field.TooLong(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("message"), string(make([]byte, v1.ResourceHealthMessageMaxLength+1)), v1.ResourceHealthMessageMaxLength), + }, + }, + + "allow message at exactly max length": { + containers: []core.Container{ + { + Name: "container-1", + Resources: core.ResourceRequirements{ + Requests: core.ResourceList{ + "test.device/test": resource.MustParse("1"), + }, + }, + }, + }, + containerStatuses: []core.ContainerStatus{ + { + Name: "container-1", + AllocatedResourcesStatus: []core.ResourceStatus{ + { + Name: "test.device/test", + Resources: []core.ResourceHealth{ + { + ResourceID: "resource-1", + Health: core.ResourceHealthStatusHealthy, + Message: ptr.To(string(make([]byte, v1.ResourceHealthMessageMaxLength))), + }, + }, + }, + }, + }, + }, + wantFieldErrors: field.ErrorList{}, + }, + + "don't allow empty message": { + containers: []core.Container{ + { + Name: "container-1", + Resources: core.ResourceRequirements{ + Requests: core.ResourceList{ + "test.device/test": resource.MustParse("1"), + }, + }, + }, + }, + containerStatuses: []core.ContainerStatus{ + { + Name: "container-1", + AllocatedResourcesStatus: []core.ResourceStatus{ + { + Name: "test.device/test", + Resources: []core.ResourceHealth{ + { + ResourceID: "resource-1", + Health: core.ResourceHealthStatusHealthy, + Message: ptr.To(""), + }, + }, + }, + }, + }, + }, + wantFieldErrors: field.ErrorList{ + field.Required(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("message"), "must be non-empty if specified"), + }, + }, } for name, tt := range testCases { t.Run(name, func(t *testing.T) { diff --git a/pkg/apis/core/zz_generated.deepcopy.go b/pkg/apis/core/zz_generated.deepcopy.go index 2e04e80f624..ca868885972 100644 --- a/pkg/apis/core/zz_generated.deepcopy.go +++ b/pkg/apis/core/zz_generated.deepcopy.go @@ -5240,6 +5240,11 @@ func (in *ResourceFieldSelector) DeepCopy() *ResourceFieldSelector { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceHealth) DeepCopyInto(out *ResourceHealth) { *out = *in + if in.Message != nil { + in, out := &in.Message, &out.Message + *out = new(string) + **out = **in + } return } @@ -5440,7 +5445,9 @@ func (in *ResourceStatus) DeepCopyInto(out *ResourceStatus) { if in.Resources != nil { in, out := &in.Resources, &out.Resources *out = make([]ResourceHealth, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } return } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 3dea0e00bd7..c459272e493 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -813,6 +813,12 @@ const ( // Adds the AllocatedResourcesStatus to the container status. ResourceHealthStatus featuregate.Feature = "ResourceHealthStatus" + // owner: @SergeyKanzhelev + // kep: https://kep.k8s.io/4680 + // + // Adds a message to the AllocatedResourcesStatus entries. + ResourceHealthStatusMessage featuregate.Feature = "ResourceHealthStatusMessage" + // owner: @yuanwang04 // kep: https://kep.k8s.io/5532 // @@ -1708,6 +1714,11 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate ResourceHealthStatus: { {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, + }, + + ResourceHealthStatusMessage: { + {Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta}, }, RestartAllContainersOnContainerExits: { @@ -2400,6 +2411,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature ResourceHealthStatus: {DynamicResourceAllocation}, + ResourceHealthStatusMessage: {ResourceHealthStatus}, + // RestartAllContainersOnContainerExits introduces a new container restart rule action. // All restart rules will be dropped by API if ContainerRestartRules feature is not enabled. RestartAllContainersOnContainerExits: {ContainerRestartRules, NodeDeclaredFeatures}, diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 9420d70f384..0cb19620003 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -30200,6 +30200,13 @@ func schema_k8sio_api_core_v1_ResourceHealth(ref common.ReferenceCallback) commo Format: "", }, }, + "message": { + SchemaProps: spec.SchemaProps{ + Description: "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"resourceID"}, }, diff --git a/pkg/kubelet/cm/dra/healthinfo.go b/pkg/kubelet/cm/dra/healthinfo.go index 77b856a27f4..68f138e88ed 100644 --- a/pkg/kubelet/cm/dra/healthinfo.go +++ b/pkg/kubelet/cm/dra/healthinfo.go @@ -123,8 +123,13 @@ func (cache *healthInfoCache) saveToCheckpointInternal(logger klog.Logger) error } // getHealthInfo returns the current health info, adjusting for timeouts. -func (cache *healthInfoCache) getHealthInfo(driverName, poolName, deviceName string) state.DeviceHealthStatus { - res := state.DeviceHealthStatusUnknown +func (cache *healthInfoCache) getHealthInfo(driverName, poolName, deviceName string) state.DeviceHealth { + res := state.DeviceHealth{ + PoolName: poolName, + DeviceName: deviceName, + Health: state.DeviceHealthStatusUnknown, + Message: "", + } _ = cache.withRLock(func() error { now := time.Now() @@ -139,9 +144,11 @@ func (cache *healthInfoCache) getHealthInfo(driverName, poolName, deviceName str // Check if device health has timed out if now.Sub(device.LastUpdated) > timeout { - res = state.DeviceHealthStatusUnknown + // Keep default Unknown status, clear message for stale device + res.Health = state.DeviceHealthStatusUnknown + res.Message = "" } else { - res = device.Health + res = device } } } @@ -176,7 +183,8 @@ func (cache *healthInfoCache) updateHealthInfo(logger klog.Logger, driverName st existingDevice, ok := currentDriver.Devices[key] - if !ok || existingDevice.Health != reportedDevice.Health || existingDevice.HealthCheckTimeout != reportedDevice.HealthCheckTimeout { + // Consider health status, message, and timeout changes as updates + if !ok || existingDevice.Health != reportedDevice.Health || existingDevice.Message != reportedDevice.Message || existingDevice.HealthCheckTimeout != reportedDevice.HealthCheckTimeout { changedDevices = append(changedDevices, reportedDevice) } @@ -197,6 +205,7 @@ func (cache *healthInfoCache) updateHealthInfo(logger klog.Logger, driverName st // Mark as unknown if the device health has timed out if existingDevice.Health != state.DeviceHealthStatusUnknown && now.Sub(existingDevice.LastUpdated) > timeout { existingDevice.Health = state.DeviceHealthStatusUnknown + existingDevice.Message = "" existingDevice.LastUpdated = now currentDriver.Devices[key] = existingDevice diff --git a/pkg/kubelet/cm/dra/healthinfo_test.go b/pkg/kubelet/cm/dra/healthinfo_test.go index f24e7d837c9..e10df9f5599 100644 --- a/pkg/kubelet/cm/dra/healthinfo_test.go +++ b/pkg/kubelet/cm/dra/healthinfo_test.go @@ -200,12 +200,12 @@ func TestGetHealthInfo(t *testing.T) { require.NoError(t, err) // Initial state - assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health) // Add a device _, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth}) require.NoError(t, err) - assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health) // Test timeout (simulated with old LastUpdated) err = cache.withLock(func() error { @@ -218,7 +218,7 @@ func TestGetHealthInfo(t *testing.T) { return nil }) require.NoError(t, err) - assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health) } // TestGetHealthInfoRobust tests retrieving health status logic solely & against many cases. @@ -353,7 +353,7 @@ func TestGetHealthInfoRobust(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cache := &healthInfoCache{HealthInfo: tt.initialState} - health := cache.getHealthInfo(tt.driverName, tt.poolName, tt.deviceName) + health := cache.getHealthInfo(tt.driverName, tt.poolName, tt.deviceName).Health assert.Equal(t, tt.expectedHealth, health) }) } @@ -372,7 +372,7 @@ func TestUpdateHealthInfo(t *testing.T) { changedDevices, err := cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth}) require.NoError(t, err) assertDeviceHealthElementsMatchIgnoreTime(t, expectedChanged1, changedDevices) - assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health) // 2 -- Update with no change changedDevices, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth}) @@ -386,7 +386,7 @@ func TestUpdateHealthInfo(t *testing.T) { changedDevices, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{newHealth}) require.NoError(t, err) assertDeviceHealthElementsMatchIgnoreTime(t, expectedChanged3, changedDevices) - assert.Equal(t, state.DeviceHealthStatusUnhealthy, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusUnhealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health) // 4 -- Add second device, omit first secondDevice := state.DeviceHealth{PoolName: testPool, DeviceName: "device2", Health: state.DeviceHealthStatusHealthy, HealthCheckTimeout: DefaultHealthTimeout} @@ -408,14 +408,14 @@ func TestUpdateHealthInfo(t *testing.T) { changedDevices, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{secondDevice}) require.NoError(t, err) assertDeviceHealthElementsMatchIgnoreTime(t, expectedChanged4, changedDevices) - assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, "device2")) - assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, "device2").Health) + assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health) // 5 -- Test persistence cache2, err := newHealthInfoCache(logger, tmpFile) require.NoError(t, err) - assert.Equal(t, state.DeviceHealthStatusHealthy, cache2.getHealthInfo(testDriver, testPool, "device2")) - assert.Equal(t, state.DeviceHealthStatusUnknown, cache2.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusHealthy, cache2.getHealthInfo(testDriver, testPool, "device2").Health) + assert.Equal(t, state.DeviceHealthStatusUnknown, cache2.getHealthInfo(testDriver, testPool, testDevice).Health) // 6 -- Test how updateHealthInfo handles device timeouts timeoutDevice := state.DeviceHealth{PoolName: testPool, DeviceName: "timeoutDevice", Health: "Unhealthy", HealthCheckTimeout: DefaultHealthTimeout} @@ -453,9 +453,9 @@ func TestClearDriver(t *testing.T) { _, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth}) require.NoError(t, err) - assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health) err = cache.clearDriver(logger, testDriver) require.NoError(t, err) - assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice)) + assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health) } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index c6417304c95..15650df6ba0 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "path/filepath" + "slices" "strconv" "time" @@ -473,6 +474,16 @@ func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim { return nil } +// truncateHealthMessage truncates a health message to the maximum allowed length. +// If the message exceeds v1.ResourceHealthMessageMaxLength, it is truncated to +// (maxLength - 3) characters and "..." is appended. +func truncateHealthMessage(message string) string { + if len(message) <= v1.ResourceHealthMessageMaxLength { + return message + } + return message[:v1.ResourceHealthMessageMaxLength-3] + "..." +} + // GetResources gets a ContainerInfo object from the claimInfo cache. // This information is used by the caller to update a container config. func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { @@ -830,10 +841,26 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat // Iterate through the claims requested by this specific container. for _, claim := range containerSpec.Resources.Claims { // Find the actual name of the ResourceClaim object. + // Try the O(1) map lookup first, then fall back to resolving + // from the pod spec. The fallback is needed because + // ResourceClaimStatuses may not be populated yet when a + // health update triggers an early status sync. actualClaimName := podClaimStatusMap[claim.Name] if actualClaimName == "" { - logger.V(4).Info("Could not find generated name for resource claim in pod status", "container", containerSpec.Name, "claimName", claim.Name) + for i := range pod.Spec.ResourceClaims { + if pod.Spec.ResourceClaims[i].Name == claim.Name { + claimNamePtr, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) + if err == nil && claimNamePtr != nil { + actualClaimName = *claimNamePtr + } + break + } + } + } + + if actualClaimName == "" { + logger.V(4).Info("Could not find generated name for resource claim", "container", containerSpec.Name, "claimName", claim.Name) continue } @@ -871,26 +898,53 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat // Clear previous health entries before adding current ones. resStatus.Resources = []v1.ResourceHealth{} + // Use a map to track resourceIDs we've already added to avoid duplicates. + // Multiple devices in claimInfo.DriverState may have the same resourceID + // (e.g., when using the same CDI device ID), so we need to deduplicate. + seenResourceIDs := make(map[v1.ResourceID]bool) + for driverName, driverState := range claimInfo.DriverState { for _, device := range driverState.Devices { - healthStr := m.healthInfoCache.getHealthInfo(driverName, device.PoolName, device.DeviceName) + // Skip devices that don't match the request we're reporting for. + // Use BaseRequestRef to handle subrequests (e.g., "request/subrequest" -> "request") + // since device.RequestNames are stored with subrequest suffixes stripped. + if claim.Request != "" && len(device.RequestNames) > 0 && !slices.Contains(device.RequestNames, resourceclaim.BaseRequestRef(claim.Request)) { + continue + } + + healthInfo := m.healthInfoCache.getHealthInfo(driverName, device.PoolName, device.DeviceName) var health v1.ResourceHealthStatus - switch healthStr { - case "Healthy": + switch healthInfo.Health { + case state.DeviceHealthStatusHealthy: health = v1.ResourceHealthStatusHealthy - case "Unhealthy": + case state.DeviceHealthStatusUnhealthy: health = v1.ResourceHealthStatusUnhealthy default: health = v1.ResourceHealthStatusUnknown } - resourceHealth := v1.ResourceHealth{Health: health} + // Create the ResourceHealth entry + resourceHealth := v1.ResourceHealth{ + Health: health, + } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ResourceHealthStatusMessage) && len(healthInfo.Message) > 0 { + resourceHealth.Message = &healthInfo.Message + } + + // Use first CDI device ID as ResourceID, with fallback if len(device.CDIDeviceIDs) > 0 { resourceHealth.ResourceID = v1.ResourceID(device.CDIDeviceIDs[0]) } else { resourceHealth.ResourceID = v1.ResourceID(fmt.Sprintf("%s/%s/%s", driverName, device.PoolName, device.DeviceName)) } + + // Skip if we've already added this resourceID + if seenResourceIDs[resourceHealth.ResourceID] { + continue + } + seenResourceIDs[resourceHealth.ResourceID] = true + resStatus.Resources = append(resStatus.Resources, resourceHealth) } } @@ -973,6 +1027,7 @@ func (m *Manager) HandleWatchResourcesStream(ctx context.Context, stream draheal Health: health, LastUpdated: time.Unix(d.GetLastUpdatedTime(), 0), HealthCheckTimeout: timeout, + Message: truncateHealthMessage(d.GetMessage()), } } diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index f6a4abbad09..c8f291ba377 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -50,6 +50,7 @@ import ( "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/utils/ptr" drahealthv1alpha1 "k8s.io/kubelet/pkg/apis/dra-health/v1alpha1" drapb "k8s.io/kubelet/pkg/apis/dra/v1" @@ -57,7 +58,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/test/utils/ktesting" - "k8s.io/utils/ptr" ) const ( @@ -1746,7 +1746,7 @@ func TestHandleWatchResourcesStream(t *testing.T) { } // Check cache state - cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName) + cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName).Health assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Cache update check failed") t.Log("HealthChangeForAllocatedDevice: Closing responses channel to signal EOF") @@ -1804,7 +1804,7 @@ func TestHandleWatchResourcesStream(t *testing.T) { } // Check health cache for the "other-device" - cachedHealthOther := manager.healthInfoCache.getHealthInfo(driverName, poolName, "other-device") + cachedHealthOther := manager.healthInfoCache.getHealthInfo(driverName, poolName, "other-device").Health assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealthOther, "Cache update for other-device failed") close(responses) @@ -1989,7 +1989,7 @@ func TestHandleWatchResourcesStream(t *testing.T) { // Verify health cache is updated correctly cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName) - assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Health cache should be updated for device with ShareID") + assert.Equal(t, state.DeviceHealthStatusUnhealthy, cachedHealth.Health, "Health cache should be updated for device with ShareID") // Verify the claim still has the ShareID set (it shouldn't be lost during health updates) claimFromCache, exists := manager.cache.get(claimName, namespace) @@ -2112,7 +2112,7 @@ func TestHandleWatchResourcesStream(t *testing.T) { // Verify health cache is updated cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName) - assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Health cache should show Unhealthy for shared device") + assert.Equal(t, state.DeviceHealthStatusUnhealthy, cachedHealth.Health, "Health cache should show Unhealthy for shared device") t.Log("HealthChangeForMultiplePodsWithSharedDevice: Closing responses channel to signal EOF") close(responses) @@ -2184,7 +2184,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { { Name: "claim:claim1", Resources: []v1.ResourceHealth{ - {ResourceID: "test-driver/pool/dev-a", Health: v1.ResourceHealthStatusHealthy}, + {ResourceID: "test-driver/pool/dev-a", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")}, }, }, }, @@ -2219,7 +2219,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { { Name: "claim:renamed-pod-claim", Resources: []v1.ResourceHealth{ - {ResourceID: "test-driver/pool/dev-b", Health: v1.ResourceHealthStatusHealthy}, + {ResourceID: "test-driver/pool/dev-b", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")}, }, }, }, @@ -2254,7 +2254,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { { Name: "claim:templated-claim", Resources: []v1.ResourceHealth{ - {ResourceID: "test-driver/pool/dev-c", Health: v1.ResourceHealthStatusHealthy}, + {ResourceID: "test-driver/pool/dev-c", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")}, }, }, }, @@ -2299,7 +2299,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { { Name: "claim:shareid-claim", Resources: []v1.ResourceHealth{ - {ResourceID: "test-driver/pool/dev-shared", Health: v1.ResourceHealthStatusHealthy}, + {ResourceID: "test-driver/pool/dev-shared", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")}, }, }, }, @@ -2308,6 +2308,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatusMessage, true) tCtx := ktesting.Init(t) logger := tCtx.Logger() manager, err := NewManager(logger, nil, t.TempDir()) @@ -2322,7 +2323,12 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { for _, ci := range tc.claimInfos { for driverName, ds := range ci.DriverState { for _, dev := range ds.Devices { - devices = append(devices, state.DeviceHealth{PoolName: dev.PoolName, DeviceName: dev.DeviceName, Health: state.DeviceHealthStatusHealthy}) + devices = append(devices, state.DeviceHealth{ + PoolName: dev.PoolName, + DeviceName: dev.DeviceName, + Health: state.DeviceHealthStatusHealthy, + Message: "Device is operating normally", + }) } _, err := manager.healthInfoCache.updateHealthInfo(logger, driverName, devices) require.NoError(t, err) @@ -2337,3 +2343,194 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) { }) } } + +func TestUpdateAllocatedResourcesStatus_Subrequest(t *testing.T) { + directClaimName := "test-claim" + + testCases := []struct { + name string + claimRequest string + deviceRequestNames []string + expectHealthReported bool + expectedHealthMessage string + expectedHealthStatus v1.ResourceHealthStatus + }{ + { + name: "Subrequest matches base name", + claimRequest: "req-base/sub", + deviceRequestNames: []string{"req-base"}, + expectHealthReported: true, + expectedHealthMessage: "Device healthy", + expectedHealthStatus: v1.ResourceHealthStatusHealthy, + }, + { + name: "Subrequest does not match different base name", + claimRequest: "req-other/sub", + deviceRequestNames: []string{"req-base"}, + expectHealthReported: false, + }, + { + name: "Base request matches base name", + claimRequest: "req-base", + deviceRequestNames: []string{"req-base"}, + expectHealthReported: true, + expectedHealthMessage: "Device healthy", + expectedHealthStatus: v1.ResourceHealthStatusHealthy, + }, + { + name: "Empty request matches any device", + claimRequest: "", + deviceRequestNames: []string{"req-base"}, + expectHealthReported: true, + expectedHealthMessage: "Device healthy", + expectedHealthStatus: v1.ResourceHealthStatusHealthy, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatusMessage, true) + tCtx := ktesting.Init(t) + manager, err := NewManager(tCtx.Logger(), nil, t.TempDir()) + require.NoError(t, err) + + // Setup claim info with device + claimInfo := &ClaimInfo{ + ClaimInfoState: state.ClaimInfoState{ + ClaimName: directClaimName, + PodUIDs: sets.New("test-pod-uid"), + DriverState: map[string]state.DriverState{ + "test-driver": { + Devices: []state.Device{ + { + PoolName: "test-pool", + DeviceName: "test-device", + RequestNames: tc.deviceRequestNames, + }, + }, + }, + }, + }, + } + manager.cache.add(claimInfo) + + // Setup health cache + devices := []state.DeviceHealth{ + { + PoolName: "test-pool", + DeviceName: "test-device", + Health: state.DeviceHealthStatusHealthy, + Message: "Device healthy", + }, + } + _, err = manager.healthInfoCache.updateHealthInfo(tCtx.Logger(), "test-driver", devices) + require.NoError(t, err) + + // Create pod and status + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod", UID: "test-pod-uid"}, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + {Name: "claim1", Request: tc.claimRequest}, + }, + }, + }, + }, + ResourceClaims: []v1.PodResourceClaim{ + {Name: "claim1", ResourceClaimName: &directClaimName}, + }, + }, + Status: v1.PodStatus{ + ResourceClaimStatuses: []v1.PodResourceClaimStatus{ + {Name: "claim1", ResourceClaimName: &directClaimName}, + }, + }, + } + + status := &v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + {Name: "container1"}, + }, + } + + // Call UpdateAllocatedResourcesStatus + manager.UpdateAllocatedResourcesStatus(pod, status) + + // Assert results + require.Len(t, status.ContainerStatuses, 1) + + if tc.expectHealthReported { + // Should have one ResourceStatus entry + require.Len(t, status.ContainerStatuses[0].AllocatedResourcesStatus, 1) + resStatus := status.ContainerStatuses[0].AllocatedResourcesStatus[0] + + // Should have one ResourceHealth entry + require.Len(t, resStatus.Resources, 1, "Expected exactly one ResourceHealth entry") + healthEntry := resStatus.Resources[0] + + // Verify health matches what we put in the cache + assert.Equal(t, tc.expectedHealthStatus, healthEntry.Health) + assert.Equal(t, tc.expectedHealthMessage, ptr.Deref(healthEntry.Message, "")) + assert.Equal(t, v1.ResourceID("test-driver/test-pool/test-device"), healthEntry.ResourceID) + } else { + // Should have no ResourceStatus entries at all (empty statuses are pruned) + assert.Empty(t, status.ContainerStatuses[0].AllocatedResourcesStatus, + "Expected AllocatedResourcesStatus to be empty when device is filtered out") + } + }) + } +} + +func TestTruncateHealthMessage(t *testing.T) { + testCases := map[string]struct { + input string + expected string + }{ + "empty message": { + input: "", + expected: "", + }, + "short message": { + input: "Device is healthy", + expected: "Device is healthy", + }, + "message at exactly max length": { + input: string(make([]byte, v1.ResourceHealthMessageMaxLength)), + expected: string(make([]byte, v1.ResourceHealthMessageMaxLength)), + }, + "message one character over max length": { + input: string(make([]byte, v1.ResourceHealthMessageMaxLength+1)), + expected: string(make([]byte, v1.ResourceHealthMessageMaxLength-3)) + "...", + }, + "very long message": { + input: string(make([]byte, v1.ResourceHealthMessageMaxLength*2)), + expected: string(make([]byte, v1.ResourceHealthMessageMaxLength-3)) + "...", + }, + "message with meaningful content": { + input: "ECC error count exceeded threshold on device GPU-0", + expected: "ECC error count exceeded threshold on device GPU-0", + }, + "long message with meaningful content": { + input: string(make([]byte, v1.ResourceHealthMessageMaxLength)) + " extra content", + expected: string(make([]byte, v1.ResourceHealthMessageMaxLength-3)) + "...", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result := truncateHealthMessage(tc.input) + assert.Equal(t, tc.expected, result) + // Verify the result is never longer than max length + assert.LessOrEqual(t, len(result), v1.ResourceHealthMessageMaxLength) + // Verify that if truncation occurred, it ends with "..." + if len(tc.input) > v1.ResourceHealthMessageMaxLength { + assert.Len(t, result, v1.ResourceHealthMessageMaxLength) + assert.Equal(t, "...", result[len(result)-3:]) + } + }) + } +} diff --git a/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go b/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go index 85a0609b6a1..367ad097517 100644 --- a/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go +++ b/pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go @@ -384,7 +384,7 @@ func (pm *DRAPluginManager) add(driverName string, endpoint string, chosenServic } p.conn = conn - if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) && pm.streamHandler != nil { pm.wg.Add(1) go func() { defer pm.wg.Done() diff --git a/pkg/kubelet/cm/dra/state/state.go b/pkg/kubelet/cm/dra/state/state.go index d61a5b65f61..5861f6ff573 100644 --- a/pkg/kubelet/cm/dra/state/state.go +++ b/pkg/kubelet/cm/dra/state/state.go @@ -101,4 +101,8 @@ type DeviceHealth struct { // Zero value means use the default timeout (DefaultHealthTimeout). // This ensures backward compatibility with existing data. HealthCheckTimeout time.Duration + + // Message provides additional details about the device's health status. + // This field is optional and may be empty. + Message string } diff --git a/staging/src/k8s.io/api/core/v1/generated.pb.go b/staging/src/k8s.io/api/core/v1/generated.pb.go index 53b4041897c..fd13da187c7 100644 --- a/staging/src/k8s.io/api/core/v1/generated.pb.go +++ b/staging/src/k8s.io/api/core/v1/generated.pb.go @@ -11564,6 +11564,13 @@ func (m *ResourceHealth) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Message != nil { + i -= len(*m.Message) + copy(dAtA[i:], *m.Message) + i = encodeVarintGenerated(dAtA, i, uint64(len(*m.Message))) + i-- + dAtA[i] = 0x32 + } i -= len(m.Health) copy(dAtA[i:], m.Health) i = encodeVarintGenerated(dAtA, i, uint64(len(m.Health))) @@ -19140,6 +19147,10 @@ func (m *ResourceHealth) Size() (n int) { n += 1 + l + sovGenerated(uint64(l)) l = len(m.Health) n += 1 + l + sovGenerated(uint64(l)) + if m.Message != nil { + l = len(*m.Message) + n += 1 + l + sovGenerated(uint64(l)) + } return n } @@ -23478,6 +23489,7 @@ func (this *ResourceHealth) String() string { s := strings.Join([]string{`&ResourceHealth{`, `ResourceID:` + fmt.Sprintf("%v", this.ResourceID) + `,`, `Health:` + fmt.Sprintf("%v", this.Health) + `,`, + `Message:` + valueToStringGenerated(this.Message) + `,`, `}`, }, "") return s @@ -59695,6 +59707,39 @@ func (m *ResourceHealth) Unmarshal(dAtA []byte) error { } m.Health = ResourceHealthStatus(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGenerated + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGenerated + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGenerated + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(dAtA[iNdEx:postIndex]) + m.Message = &s + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipGenerated(dAtA[iNdEx:]) diff --git a/staging/src/k8s.io/api/core/v1/generated.proto b/staging/src/k8s.io/api/core/v1/generated.proto index d92fc2ec337..522aadf3fcc 100644 --- a/staging/src/k8s.io/api/core/v1/generated.proto +++ b/staging/src/k8s.io/api/core/v1/generated.proto @@ -5473,6 +5473,12 @@ message ResourceHealth { // // In future we may want to introduce the PermanentlyUnhealthy Status. optional string health = 2; + + // Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold"). + // This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise. + // +featureGate=ResourceHealthStatusMessage + // +optional + optional string message = 6; } // ResourceQuota sets aggregate quota restrictions enforced per namespace diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index a790a079e97..18806df459d 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -3418,6 +3418,10 @@ const ( ResourceHealthStatusUnknown ResourceHealthStatus = "Unknown" ) +// ResourceHealthMessageMaxLength is the maximum length for ResourceHealth.Message field. +// Messages longer than this will be truncated with "..." appended. +const ResourceHealthMessageMaxLength = 1024 + // ResourceID is calculated based on the source of this resource health information. // For DevicePlugin: // @@ -3445,6 +3449,11 @@ type ResourceHealth struct { // // In future we may want to introduce the PermanentlyUnhealthy Status. Health ResourceHealthStatus `json:"health,omitempty" protobuf:"bytes,2,name=health"` + // Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold"). + // This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise. + // +featureGate=ResourceHealthStatusMessage + // +optional + Message *string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"` } // ContainerUser represents user identity information diff --git a/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go b/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go index ff1ba7bb99e..8aa1e172cdc 100644 --- a/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go +++ b/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go @@ -2239,6 +2239,7 @@ var map_ResourceHealth = map[string]string{ "": "ResourceHealth represents the health of a resource. It has the latest device health information. This is a part of KEP https://kep.k8s.io/4680.", "resourceID": "ResourceID is the unique identifier of the resource. See the ResourceID type for more information.", "health": "Health of the resource. can be one of:\n - Healthy: operates as normal\n - Unhealthy: reported unhealthy. We consider this a temporary health issue\n since we do not have a mechanism today to distinguish\n temporary and permanent issues.\n - Unknown: The status cannot be determined.\n For example, Device Plugin got unregistered and hasn't been re-registered since.\n\nIn future we may want to introduce the PermanentlyUnhealthy Status.", + "message": "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.", } func (ResourceHealth) SwaggerDoc() map[string]string { diff --git a/staging/src/k8s.io/api/core/v1/zz_generated.deepcopy.go b/staging/src/k8s.io/api/core/v1/zz_generated.deepcopy.go index a04f16d5215..6240aa3e6f6 100644 --- a/staging/src/k8s.io/api/core/v1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/api/core/v1/zz_generated.deepcopy.go @@ -5238,6 +5238,11 @@ func (in *ResourceFieldSelector) DeepCopy() *ResourceFieldSelector { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceHealth) DeepCopyInto(out *ResourceHealth) { *out = *in + if in.Message != nil { + in, out := &in.Message, &out.Message + *out = new(string) + **out = **in + } return } @@ -5438,7 +5443,9 @@ func (in *ResourceStatus) DeepCopyInto(out *ResourceStatus) { if in.Resources != nil { in, out := &in.Resources, &out.Resources *out = make([]ResourceHealth, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } return } diff --git a/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.json b/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.json index b67fedaa357..4a74ea64495 100644 --- a/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.json +++ b/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.json @@ -1915,7 +1915,8 @@ "resources": [ { "resourceID": "resourceIDValue", - "health": "healthValue" + "health": "healthValue", + "message": "messageValue" } ] } @@ -2013,7 +2014,8 @@ "resources": [ { "resourceID": "resourceIDValue", - "health": "healthValue" + "health": "healthValue", + "message": "messageValue" } ] } @@ -2112,7 +2114,8 @@ "resources": [ { "resourceID": "resourceIDValue", - "health": "healthValue" + "health": "healthValue", + "message": "messageValue" } ] } diff --git a/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.pb b/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.pb index dda95e42eaeae8e3f1fa4d9a9643caccd74598d7..7b5ea99402782378921929023451c213b2381b8a 100644 GIT binary patch delta 153 zcmZ3TdNoxr+oG6(i<66~%ut9qAU{QD|3<++EyfL#7idjqT(mh(_XZ=MB^PI2Vs2_! pVoqtQkj`W|T{%u8p4`;p;>7gS$>zFhM5&9_w<1Q}=IMqai~w*bFRlOp delta 131 zcmcbbx;j-b+oG6(i<66~%ut9qAU{QD>PEpnEylLV3$&&)PT8EJdxMcrn~O6qF*h|V jF{drLw-g8pE4;7 diff --git a/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.yaml b/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.yaml index 0f3e923720a..9c255298cd9 100644 --- a/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.yaml +++ b/staging/src/k8s.io/api/testdata/HEAD/core.v1.Pod.yaml @@ -1247,6 +1247,7 @@ status: - name: nameValue resources: - health: healthValue + message: messageValue resourceID: resourceIDValue containerID: containerIDValue image: imageValue @@ -1313,6 +1314,7 @@ status: - name: nameValue resources: - health: healthValue + message: messageValue resourceID: resourceIDValue containerID: containerIDValue image: imageValue @@ -1388,6 +1390,7 @@ status: - name: nameValue resources: - health: healthValue + message: messageValue resourceID: resourceIDValue containerID: containerIDValue image: imageValue diff --git a/staging/src/k8s.io/api/testdata/HEAD/core.v1.PodStatusResult.json b/staging/src/k8s.io/api/testdata/HEAD/core.v1.PodStatusResult.json index d5f0c40524a..25c9487c845 100644 --- a/staging/src/k8s.io/api/testdata/HEAD/core.v1.PodStatusResult.json +++ b/staging/src/k8s.io/api/testdata/HEAD/core.v1.PodStatusResult.json @@ -163,7 +163,8 @@ "resources": [ { "resourceID": "resourceIDValue", - "health": "healthValue" + "health": "healthValue", + "message": "messageValue" } ] } @@ -261,7 +262,8 @@ "resources": [ { "resourceID": "resourceIDValue", - "health": "healthValue" + "health": "healthValue", + "message": "messageValue" } ] } @@ -360,7 +362,8 @@ "resources": [ { "resourceID": "resourceIDValue", - "health": "healthValue" + "health": "healthValue", + "message": "messageValue" } ] } diff --git a/staging/src/k8s.io/api/testdata/HEAD/core.v1.PodStatusResult.pb b/staging/src/k8s.io/api/testdata/HEAD/core.v1.PodStatusResult.pb index 3f62fcd3295fa8d5c4acd9f970828b789b03e218..2e71a452462362f70fc63816d01071ae3b410931 100644 GIT binary patch delta 147 zcmeAZ-XT0ehH0AcM!6J5#toC38K*NY+U&@3gOSgYi!(1VH#ICVr!-YaXEGy;9H$Xa eZfbFHVtVRi2R5NodeDeclaredFeatures | [code](https://cs.k8s.io/?q=%5CbRestartAllContainersOnContainerExits%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRestartAllContainersOnContainerExits%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | RetryGenerateName | :ballot_box_with_check: 1.31+ | :closed_lock_with_key: 1.32+ | 1.30 | 1.31 | 1.32– | | | [code](https://cs.k8s.io/?q=%5CbRetryGenerateName%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRetryGenerateName%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | RotateKubeletServerCertificate | :ballot_box_with_check: 1.12+ | | 1.7–1.11 | 1.12– | | | | [code](https://cs.k8s.io/?q=%5CbRotateKubeletServerCertificate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRotateKubeletServerCertificate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index ad22ce8406b..c512d0bbb48 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1513,6 +1513,16 @@ lockToDefault: false preRelease: Alpha version: "1.31" + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" +- name: ResourceHealthStatusMessage + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.36" - name: RestartAllContainersOnContainerExits versionedSpecs: - default: false diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index ec286021796..e6c68f6e576 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -3135,4 +3135,191 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() { multipleDriversContext("using only drapbv1", false, true) multipleDriversContext("using drapbv1beta1 and drapbv1", true, true) }) + + // Test for device health reporting with custom messages + framework.Context("kubelet", feature.DynamicResourceAllocation, func() { + nodes := drautils.NewNodes(f, 1, 1) + driver := drautils.NewDriver(f, nodes, drautils.NetworkResources(10, false)) + b := drautils.NewBuilder(f, driver) + + f.It("should report device health with custom messages", f.WithLabel("KubeletMinVersion:1.36"), framework.WithFeatureGate(features.ResourceHealthStatusMessage), func(ctx context.Context) { + claimName := "health-test-claim" + claim := b.ExternalClaim() + claim.Name = claimName + pod := b.PodExternal(claimName) + pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name + b.Create(f.TContext(ctx), claim, pod) + + // Wait for pod to start running + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") + + // Get the pool name and device name from the allocated claim + allocatedClaim, err := f.ClientSet.ResourceV1().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(allocatedClaim.Status.Allocation).ToNot(gomega.BeNil()) + gomega.Expect(allocatedClaim.Status.Allocation.Devices.Results).To(gomega.HaveLen(1)) + + poolName := allocatedClaim.Status.Allocation.Devices.Results[0].Pool + deviceName := allocatedClaim.Status.Allocation.Devices.Results[0].Device + + // Get the plugin for the node where the pod is scheduled + scheduledPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + plugin, ok := driver.Nodes[scheduledPod.Spec.NodeName] + if !ok { + framework.Failf("pod got scheduled to node %s without a plugin", scheduledPod.Spec.NodeName) + } + + // Test 1: Set and check initial healthy status with message + ginkgo.By("Setting initial healthy status with message") + plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{ + PoolName: poolName, + DeviceName: deviceName, + Health: "Healthy", + Message: "Device operating normally, temperature: 45°C", + } + + ginkgo.By("Checking initial healthy status with message") + gomega.Eventually(ctx, func(ctx context.Context) string { + updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return "" + } + + for _, cs := range updatedPod.Status.ContainerStatuses { + for _, rs := range cs.AllocatedResourcesStatus { + if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) { + for _, rh := range rs.Resources { + return ptr.Deref(rh.Message, "") + } + } + } + } + return "" + }).WithTimeout(30 * time.Second).Should(gomega.Equal("Device operating normally, temperature: 45°C")) + + // Test 2: Mark device as unhealthy with error message + ginkgo.By("Marking device as unhealthy with error message") + plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{ + PoolName: poolName, + DeviceName: deviceName, + Health: "Unhealthy", + Message: "ECC error count exceeded threshold (15 errors in last hour)", + } + + // First Eventually: Check health status + gomega.Eventually(ctx, func(ctx context.Context) v1.ResourceHealthStatus { + updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return "" + } + + for _, cs := range updatedPod.Status.ContainerStatuses { + for _, rs := range cs.AllocatedResourcesStatus { + if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) { + for _, rh := range rs.Resources { + return rh.Health + } + } + } + } + return "" + }).WithTimeout(30 * time.Second).Should(gomega.Equal(v1.ResourceHealthStatusUnhealthy)) + + // Second Eventually: Check message + gomega.Eventually(ctx, func(ctx context.Context) string { + updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return "" + } + + for _, cs := range updatedPod.Status.ContainerStatuses { + for _, rs := range cs.AllocatedResourcesStatus { + if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) { + for _, rh := range rs.Resources { + return ptr.Deref(rh.Message, "") + } + } + } + } + return "" + }).WithTimeout(30 * time.Second).Should(gomega.Equal("ECC error count exceeded threshold (15 errors in last hour)")) + + // Test 3: Update message while still unhealthy + ginkgo.By("Updating message while device remains unhealthy") + plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{ + PoolName: poolName, + DeviceName: deviceName, + Health: "Unhealthy", + Message: "Critical: Memory corruption detected", + } + + gomega.Eventually(ctx, func(ctx context.Context) string { + updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return "" + } + + for _, cs := range updatedPod.Status.ContainerStatuses { + for _, rs := range cs.AllocatedResourcesStatus { + if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) { + for _, rh := range rs.Resources { + if rh.Health == v1.ResourceHealthStatusUnhealthy { + return ptr.Deref(rh.Message, "") + } + } + } + } + } + return "" + }).WithTimeout(30 * time.Second).Should(gomega.Equal("Critical: Memory corruption detected")) + + // Test 4: Mark device as healthy with recovery message + ginkgo.By("Marking device as healthy with recovery message") + plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{ + PoolName: poolName, + DeviceName: deviceName, + Health: "Healthy", + Message: "Recovered after reset, all diagnostics passed", + } + + // First Eventually: Check health status + gomega.Eventually(ctx, func(ctx context.Context) v1.ResourceHealthStatus { + updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return "" + } + + for _, cs := range updatedPod.Status.ContainerStatuses { + for _, rs := range cs.AllocatedResourcesStatus { + if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) { + for _, rh := range rs.Resources { + return rh.Health + } + } + } + } + return "" + }).WithTimeout(30 * time.Second).Should(gomega.Equal(v1.ResourceHealthStatusHealthy)) + + // Second Eventually: Check message + gomega.Eventually(ctx, func(ctx context.Context) string { + updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return "" + } + + for _, cs := range updatedPod.Status.ContainerStatuses { + for _, rs := range cs.AllocatedResourcesStatus { + if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) { + for _, rh := range rs.Resources { + return ptr.Deref(rh.Message, "") + } + } + } + } + return "" + }).WithTimeout(30 * time.Second).Should(gomega.Equal("Recovered after reset, all diagnostics passed")) + }) + }) }) diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index a31b5513220..d6a557528cb 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -54,6 +54,12 @@ type DeviceHealthUpdate struct { PoolName string DeviceName string Health string + Message string +} + +type deviceHealthInfo struct { + status string + message string } type ExamplePlugin struct { @@ -76,7 +82,7 @@ type ExamplePlugin struct { gRPCCalls []GRPCCall healthMutex sync.Mutex - deviceHealth map[string]string + deviceHealth map[string]deviceHealthInfo HealthControlChan chan DeviceHealthUpdate blockPrepareResourcesMutex sync.Mutex @@ -205,7 +211,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube nodeName: nodeName, prepared: make(map[ClaimID][]kubeletplugin.Device), cancelMainContext: testOpts.cancelMainContext, - deviceHealth: make(map[string]string), + deviceHealth: make(map[string]deviceHealthInfo), HealthControlChan: make(chan DeviceHealthUpdate, 10), } @@ -633,7 +639,10 @@ func (ex *ExamplePlugin) NodeWatchResources(req *drahealthv1alpha1.NodeWatchReso logger.V(3).Info("Received health update from control channel", "update", update) ex.healthMutex.Lock() key := update.PoolName + "/" + update.DeviceName - ex.deviceHealth[key] = update.Health + ex.deviceHealth[key] = deviceHealthInfo{ + status: update.Health, + message: update.Message, + } ex.healthMutex.Unlock() if err := ex.sendHealthUpdate(srv); err != nil { @@ -657,7 +666,7 @@ func (ex *ExamplePlugin) sendHealthUpdate(srv drahealthv1alpha1.DRAResourceHealt healthUpdates := []*drahealthv1alpha1.DeviceHealth{} ex.healthMutex.Lock() - for key, health := range ex.deviceHealth { + for key, healthInfo := range ex.deviceHealth { parts := strings.SplitN(key, "/", 2) if len(parts) != 2 { continue @@ -666,7 +675,7 @@ func (ex *ExamplePlugin) sendHealthUpdate(srv drahealthv1alpha1.DRAResourceHealt deviceName := parts[1] var healthEnum drahealthv1alpha1.HealthStatus - switch health { + switch healthInfo.status { case "Healthy": healthEnum = drahealthv1alpha1.HealthStatus_HEALTHY case "Unhealthy": @@ -682,6 +691,7 @@ func (ex *ExamplePlugin) sendHealthUpdate(srv drahealthv1alpha1.DRAResourceHealt }, Health: healthEnum, LastUpdatedTime: time.Now().Unix(), + Message: healthInfo.message, }) } ex.healthMutex.Unlock() diff --git a/test/e2e/dra/utils/deploy.go b/test/e2e/dra/utils/deploy.go index c44f2101b38..1ea7c104aa0 100644 --- a/test/e2e/dra/utils/deploy.go +++ b/test/e2e/dra/utils/deploy.go @@ -662,6 +662,7 @@ func (d *Driver) SetUp(tCtx ktesting.TContext, kubeletRootDir string, nodes *Nod } plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, + app.Options{EnableHealthService: true}, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { return d.interceptor(nodename, ctx, req, info, handler)