KEP-4680: Add message field support to DRA device health reporting

Author: Carlos Eduardo Arango Gutierrez <eduardoa@nvidia.com>
Co-Authored-By: Harshal Patil <12152047+harche@users.noreply.github.com>

Signed-off-by: Harshal Patil <12152047+harche@users.noreply.github.com>
This commit is contained in:
Carlos Eduardo Arango Gutierrez 2025-11-10 15:50:09 -05:00 committed by Harshal Patil
parent 2a5b28436c
commit ad6c155449
37 changed files with 1507 additions and 73 deletions

View file

@ -11283,6 +11283,10 @@
"description": "Health of the resource. can be one of:\n - Healthy: operates as normal\n - Unhealthy: reported unhealthy. We consider this a temporary health issue\n since we do not have a mechanism today to distinguish\n temporary and permanent issues.\n - Unknown: The status cannot be determined.\n For example, Device Plugin got unregistered and hasn't been re-registered since.\n\nIn future we may want to introduce the PermanentlyUnhealthy Status.",
"type": "string"
},
"message": {
"description": "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.",
"type": "string"
},
"resourceID": {
"description": "ResourceID is the unique identifier of the resource. See the ResourceID type for more information.",
"type": "string"

View file

@ -6941,6 +6941,10 @@
"description": "Health of the resource. can be one of:\n - Healthy: operates as normal\n - Unhealthy: reported unhealthy. We consider this a temporary health issue\n since we do not have a mechanism today to distinguish\n temporary and permanent issues.\n - Unknown: The status cannot be determined.\n For example, Device Plugin got unregistered and hasn't been re-registered since.\n\nIn future we may want to introduce the PermanentlyUnhealthy Status.",
"type": "string"
},
"message": {
"description": "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.",
"type": "string"
},
"resourceID": {
"default": "",
"description": "ResourceID is the unique identifier of the resource. See the ResourceID type for more information.",

View file

@ -1056,7 +1056,7 @@ func dropDisabledPodStatusFields(podStatus, oldPodStatus *api.PodStatus, podSpec
}
}
if !utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) && !resourceHealthStatusInUse(oldPodStatus) {
setAllocatedResourcesStatusToNil := func(csl []api.ContainerStatus) {
for i := range csl {
csl[i].AllocatedResourcesStatus = nil
@ -1067,6 +1067,21 @@ func dropDisabledPodStatusFields(podStatus, oldPodStatus *api.PodStatus, podSpec
setAllocatedResourcesStatusToNil(podStatus.EphemeralContainerStatuses)
}
if !utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatusMessage) && !resourceHealthStatusMessageInUse(oldPodStatus) {
dropMessageField := func(csl []api.ContainerStatus) {
for i := range csl {
for j := range csl[i].AllocatedResourcesStatus {
for k := range csl[i].AllocatedResourcesStatus[j].Resources {
csl[i].AllocatedResourcesStatus[j].Resources[k].Message = nil
}
}
}
}
dropMessageField(podStatus.ContainerStatuses)
dropMessageField(podStatus.InitContainerStatuses)
dropMessageField(podStatus.EphemeralContainerStatuses)
}
// drop ContainerStatus.User field to empty (disable SupplementalGroupsPolicy)
if !utilfeature.DefaultFeatureGate.Enabled(features.SupplementalGroupsPolicy) && !supplementalGroupsPolicyInUse(oldPodSpec) {
dropUserField := func(csl []api.ContainerStatus) {
@ -1111,6 +1126,64 @@ func draExendedResourceInUse(podStatus *api.PodStatus) bool {
return false
}
func resourceHealthStatusInUse(podStatus *api.PodStatus) bool {
if podStatus == nil {
return false
}
checkContainerStatuses := func(csl []api.ContainerStatus) bool {
for _, cs := range csl {
if len(cs.AllocatedResourcesStatus) > 0 {
return true
}
}
return false
}
if checkContainerStatuses(podStatus.ContainerStatuses) {
return true
}
if checkContainerStatuses(podStatus.InitContainerStatuses) {
return true
}
if checkContainerStatuses(podStatus.EphemeralContainerStatuses) {
return true
}
return false
}
func resourceHealthStatusMessageInUse(podStatus *api.PodStatus) bool {
if podStatus == nil {
return false
}
checkContainerStatuses := func(csl []api.ContainerStatus) bool {
for _, cs := range csl {
for _, rs := range cs.AllocatedResourcesStatus {
for _, rh := range rs.Resources {
if rh.Message != nil {
return true
}
}
}
}
return false
}
if checkContainerStatuses(podStatus.ContainerStatuses) {
return true
}
if checkContainerStatuses(podStatus.InitContainerStatuses) {
return true
}
if checkContainerStatuses(podStatus.EphemeralContainerStatuses) {
return true
}
return false
}
func dynamicResourceAllocationInUse(podSpec *api.PodSpec) bool {
// We only need to check this field because the containers cannot have
// resource requirements entries for claims without a corresponding

View file

@ -6734,3 +6734,633 @@ func TestDropDisabledPodStatusFields_InPlacePodLevelResourcesVerticalScaling(t *
})
}
}
func TestResourceHealthStatusInUse(t *testing.T) {
testCases := []struct {
name string
podStatus *api.PodStatus
expected bool
}{
{
name: "nil pod status",
podStatus: nil,
expected: false,
},
{
name: "empty pod status",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{},
},
expected: false,
},
{
name: "pod status with AllocatedResourcesStatus in container",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "test-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
},
},
},
},
},
},
},
expected: true,
},
{
name: "pod status with AllocatedResourcesStatus in init container",
podStatus: &api.PodStatus{
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
},
},
},
},
},
},
},
expected: true,
},
{
name: "pod status with AllocatedResourcesStatus in ephemeral container",
podStatus: &api.PodStatus{
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
},
},
},
},
},
},
},
expected: true,
},
{
name: "pod status without AllocatedResourcesStatus",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "test-container",
},
},
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
},
},
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
},
},
},
expected: false,
},
{
name: "pod status with empty AllocatedResourcesStatus array",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "test-container",
AllocatedResourcesStatus: []api.ResourceStatus{},
},
},
},
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := resourceHealthStatusInUse(tc.podStatus)
if result != tc.expected {
t.Errorf("resourceHealthStatusInUse() = %v, want %v", result, tc.expected)
}
})
}
}
func TestDropDisabledPodStatusFields_ResourceHealthStatus(t *testing.T) {
podStatusWithResourceHealth := func() *api.PodStatus {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "container1",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
},
},
},
},
},
},
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/gpu",
Resources: []api.ResourceHealth{
{
ResourceID: "gpu-1",
Health: api.ResourceHealthStatusUnhealthy,
},
},
},
},
},
},
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/nic",
Resources: []api.ResourceHealth{
{
ResourceID: "nic-1",
Health: api.ResourceHealthStatusHealthy,
},
},
},
},
},
},
}
}
podStatusWithoutResourceHealth := func() *api.PodStatus {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "container1",
},
},
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
},
},
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
},
},
}
}
podStatusNilResourceHealth := func() *api.PodStatus {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "container1",
AllocatedResourcesStatus: nil,
},
},
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
AllocatedResourcesStatus: nil,
},
},
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
AllocatedResourcesStatus: nil,
},
},
}
}
tests := []struct {
name string
enabled bool
podStatus *api.PodStatus
oldPodStatus *api.PodStatus
wantPodStatus *api.PodStatus
}{
{
name: "feature enabled, old=without, new=without",
enabled: true,
oldPodStatus: podStatusWithoutResourceHealth(),
podStatus: podStatusWithoutResourceHealth(),
wantPodStatus: podStatusWithoutResourceHealth(),
},
{
name: "feature enabled, old=with, new=with",
enabled: true,
oldPodStatus: podStatusWithResourceHealth(),
podStatus: podStatusWithResourceHealth(),
wantPodStatus: podStatusWithResourceHealth(),
},
{
name: "feature enabled, old=without, new=with",
enabled: true,
oldPodStatus: podStatusWithoutResourceHealth(),
podStatus: podStatusWithResourceHealth(),
wantPodStatus: podStatusWithResourceHealth(),
},
{
name: "feature disabled, old=without, new=without",
enabled: false,
oldPodStatus: podStatusWithoutResourceHealth(),
podStatus: podStatusWithoutResourceHealth(),
wantPodStatus: podStatusNilResourceHealth(),
},
{
name: "feature disabled, old=without, new=with (should drop)",
enabled: false,
oldPodStatus: podStatusWithoutResourceHealth(),
podStatus: podStatusWithResourceHealth(),
wantPodStatus: podStatusNilResourceHealth(),
},
{
name: "feature disabled, old=with, new=with (should preserve - bug fix)",
enabled: false,
oldPodStatus: podStatusWithResourceHealth(),
podStatus: podStatusWithResourceHealth(),
wantPodStatus: podStatusWithResourceHealth(),
},
{
name: "feature disabled, old=with, new=without (should preserve nil)",
enabled: false,
oldPodStatus: podStatusWithResourceHealth(),
podStatus: podStatusWithoutResourceHealth(),
wantPodStatus: podStatusWithoutResourceHealth(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, tt.enabled)
dropDisabledPodStatusFields(tt.podStatus, tt.oldPodStatus, &api.PodSpec{}, &api.PodSpec{})
if !reflect.DeepEqual(tt.podStatus, tt.wantPodStatus) {
t.Errorf("dropDisabledPodStatusFields() = %v, want %v\ndiff: %v",
tt.podStatus, tt.wantPodStatus, cmp.Diff(tt.wantPodStatus, tt.podStatus))
}
})
}
}
func TestResourceHealthStatusMessageInUse(t *testing.T) {
message := "test message"
testCases := []struct {
name string
podStatus *api.PodStatus
expected bool
}{
{
name: "nil pod status",
podStatus: nil,
expected: false,
},
{
name: "empty pod status",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{},
},
expected: false,
},
{
name: "pod status with message in container",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "test-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
Message: &message,
},
},
},
},
},
},
},
expected: true,
},
{
name: "pod status with message in init container",
podStatus: &api.PodStatus{
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
Message: &message,
},
},
},
},
},
},
},
expected: true,
},
{
name: "pod status with message in ephemeral container",
podStatus: &api.PodStatus{
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
Message: &message,
},
},
},
},
},
},
},
expected: true,
},
{
name: "pod status without message (nil pointer)",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "test-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
Message: nil,
},
},
},
},
},
},
},
expected: false,
},
{
name: "pod status with AllocatedResourcesStatus but no message",
podStatus: &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "test-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
},
},
},
},
},
},
},
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := resourceHealthStatusMessageInUse(tc.podStatus)
if result != tc.expected {
t.Errorf("resourceHealthStatusMessageInUse() = %v, want %v", result, tc.expected)
}
})
}
}
func TestDropDisabledPodStatusFields_ResourceHealthStatusMessage(t *testing.T) {
message1 := "ECC error detected"
message2 := "GPU temperature high"
message3 := "NIC link down"
podStatusWithMessage := func() *api.PodStatus {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "container1",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
Message: &message1,
},
},
},
},
},
},
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/gpu",
Resources: []api.ResourceHealth{
{
ResourceID: "gpu-1",
Health: api.ResourceHealthStatusUnhealthy,
Message: &message2,
},
},
},
},
},
},
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/nic",
Resources: []api.ResourceHealth{
{
ResourceID: "nic-1",
Health: api.ResourceHealthStatusHealthy,
Message: &message3,
},
},
},
},
},
},
}
}
podStatusWithoutMessage := func() *api.PodStatus {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "container1",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/device",
Resources: []api.ResourceHealth{
{
ResourceID: "device-1",
Health: api.ResourceHealthStatusHealthy,
Message: nil,
},
},
},
},
},
},
InitContainerStatuses: []api.ContainerStatus{
{
Name: "init-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/gpu",
Resources: []api.ResourceHealth{
{
ResourceID: "gpu-1",
Health: api.ResourceHealthStatusUnhealthy,
Message: nil,
},
},
},
},
},
},
EphemeralContainerStatuses: []api.ContainerStatus{
{
Name: "ephemeral-container",
AllocatedResourcesStatus: []api.ResourceStatus{
{
Name: "example.com/nic",
Resources: []api.ResourceHealth{
{
ResourceID: "nic-1",
Health: api.ResourceHealthStatusHealthy,
Message: nil,
},
},
},
},
},
},
}
}
tests := []struct {
name string
enabled bool
podStatus *api.PodStatus
oldPodStatus *api.PodStatus
wantPodStatus *api.PodStatus
}{
{
name: "feature enabled, old=without, new=without",
enabled: true,
oldPodStatus: podStatusWithoutMessage(),
podStatus: podStatusWithoutMessage(),
wantPodStatus: podStatusWithoutMessage(),
},
{
name: "feature enabled, old=with, new=with",
enabled: true,
oldPodStatus: podStatusWithMessage(),
podStatus: podStatusWithMessage(),
wantPodStatus: podStatusWithMessage(),
},
{
name: "feature enabled, old=without, new=with",
enabled: true,
oldPodStatus: podStatusWithoutMessage(),
podStatus: podStatusWithMessage(),
wantPodStatus: podStatusWithMessage(),
},
{
name: "feature disabled, old=without, new=without",
enabled: false,
oldPodStatus: podStatusWithoutMessage(),
podStatus: podStatusWithoutMessage(),
wantPodStatus: podStatusWithoutMessage(),
},
{
name: "feature disabled, old=without, new=with (should drop)",
enabled: false,
oldPodStatus: podStatusWithoutMessage(),
podStatus: podStatusWithMessage(),
wantPodStatus: podStatusWithoutMessage(),
},
{
name: "feature disabled, old=with, new=with (should preserve - bug fix)",
enabled: false,
oldPodStatus: podStatusWithMessage(),
podStatus: podStatusWithMessage(),
wantPodStatus: podStatusWithMessage(),
},
{
name: "feature disabled, old=with, new=without (should preserve nil)",
enabled: false,
oldPodStatus: podStatusWithMessage(),
podStatus: podStatusWithoutMessage(),
wantPodStatus: podStatusWithoutMessage(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Enable ResourceHealthStatus as well since ResourceHealthStatusMessage depends on it
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatusMessage, tt.enabled)
dropDisabledPodStatusFields(tt.podStatus, tt.oldPodStatus, &api.PodSpec{}, &api.PodSpec{})
if !reflect.DeepEqual(tt.podStatus, tt.wantPodStatus) {
t.Errorf("dropDisabledPodStatusFields() = %v, want %v\ndiff: %v",
tt.podStatus, tt.wantPodStatus, cmp.Diff(tt.wantPodStatus, tt.podStatus))
}
})
}
}

View file

@ -3075,6 +3075,9 @@ type ResourceHealth struct {
//
// In future we may want to introduce the PermanentlyUnhealthy Status.
Health ResourceHealthStatus
// Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold").
// This field is populated by the kubelet when ResourceHealthStatusMessage is enabled.
Message *string
}
// ContainerUser represents user identity information

View file

@ -7838,6 +7838,7 @@ func Convert_core_ResourceFieldSelector_To_v1_ResourceFieldSelector(in *core.Res
func autoConvert_v1_ResourceHealth_To_core_ResourceHealth(in *corev1.ResourceHealth, out *core.ResourceHealth, s conversion.Scope) error {
out.ResourceID = core.ResourceID(in.ResourceID)
out.Health = core.ResourceHealthStatus(in.Health)
out.Message = (*string)(unsafe.Pointer(in.Message))
return nil
}
@ -7849,6 +7850,7 @@ func Convert_v1_ResourceHealth_To_core_ResourceHealth(in *corev1.ResourceHealth,
func autoConvert_core_ResourceHealth_To_v1_ResourceHealth(in *core.ResourceHealth, out *corev1.ResourceHealth, s conversion.Scope) error {
out.ResourceID = corev1.ResourceID(in.ResourceID)
out.Health = corev1.ResourceHealthStatus(in.Health)
out.Message = (*string)(unsafe.Pointer(in.Message))
return nil
}

View file

@ -9524,6 +9524,14 @@ func validateContainerStatusAllocatedResourcesStatus(containerStatuses []core.Co
allErrors = append(allErrors, field.NotSupported(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("health"), r.Health, sets.List(supportedResourceHealthValues)))
}
if r.Message != nil {
if len(*r.Message) == 0 {
allErrors = append(allErrors, field.Required(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("message"), "must be non-empty if specified"))
} else if len(*r.Message) > v1.ResourceHealthMessageMaxLength {
allErrors = append(allErrors, field.TooLong(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("message"), *r.Message, v1.ResourceHealthMessageMaxLength))
}
}
if uniqueResources.Has(r.ResourceID) {
allErrors = append(allErrors, field.Duplicate(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("resourceID"), r.ResourceID))
} else {

View file

@ -27976,6 +27976,103 @@ func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) {
field.NotSupported(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("health"), core.ResourceHealthStatus("invalid-health-value"), []string{"Healthy", "Unhealthy", "Unknown"}),
},
},
"don't allow message longer than max length": {
containers: []core.Container{
{
Name: "container-1",
Resources: core.ResourceRequirements{
Requests: core.ResourceList{
"test.device/test": resource.MustParse("1"),
},
},
},
},
containerStatuses: []core.ContainerStatus{
{
Name: "container-1",
AllocatedResourcesStatus: []core.ResourceStatus{
{
Name: "test.device/test",
Resources: []core.ResourceHealth{
{
ResourceID: "resource-1",
Health: core.ResourceHealthStatusHealthy,
Message: ptr.To(string(make([]byte, v1.ResourceHealthMessageMaxLength+1))),
},
},
},
},
},
},
wantFieldErrors: field.ErrorList{
field.TooLong(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("message"), string(make([]byte, v1.ResourceHealthMessageMaxLength+1)), v1.ResourceHealthMessageMaxLength),
},
},
"allow message at exactly max length": {
containers: []core.Container{
{
Name: "container-1",
Resources: core.ResourceRequirements{
Requests: core.ResourceList{
"test.device/test": resource.MustParse("1"),
},
},
},
},
containerStatuses: []core.ContainerStatus{
{
Name: "container-1",
AllocatedResourcesStatus: []core.ResourceStatus{
{
Name: "test.device/test",
Resources: []core.ResourceHealth{
{
ResourceID: "resource-1",
Health: core.ResourceHealthStatusHealthy,
Message: ptr.To(string(make([]byte, v1.ResourceHealthMessageMaxLength))),
},
},
},
},
},
},
wantFieldErrors: field.ErrorList{},
},
"don't allow empty message": {
containers: []core.Container{
{
Name: "container-1",
Resources: core.ResourceRequirements{
Requests: core.ResourceList{
"test.device/test": resource.MustParse("1"),
},
},
},
},
containerStatuses: []core.ContainerStatus{
{
Name: "container-1",
AllocatedResourcesStatus: []core.ResourceStatus{
{
Name: "test.device/test",
Resources: []core.ResourceHealth{
{
ResourceID: "resource-1",
Health: core.ResourceHealthStatusHealthy,
Message: ptr.To(""),
},
},
},
},
},
},
wantFieldErrors: field.ErrorList{
field.Required(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("message"), "must be non-empty if specified"),
},
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {

View file

@ -5240,6 +5240,11 @@ func (in *ResourceFieldSelector) DeepCopy() *ResourceFieldSelector {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceHealth) DeepCopyInto(out *ResourceHealth) {
*out = *in
if in.Message != nil {
in, out := &in.Message, &out.Message
*out = new(string)
**out = **in
}
return
}
@ -5440,7 +5445,9 @@ func (in *ResourceStatus) DeepCopyInto(out *ResourceStatus) {
if in.Resources != nil {
in, out := &in.Resources, &out.Resources
*out = make([]ResourceHealth, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}

View file

@ -813,6 +813,12 @@ const (
// Adds the AllocatedResourcesStatus to the container status.
ResourceHealthStatus featuregate.Feature = "ResourceHealthStatus"
// owner: @SergeyKanzhelev
// kep: https://kep.k8s.io/4680
//
// Adds a message to the AllocatedResourcesStatus entries.
ResourceHealthStatusMessage featuregate.Feature = "ResourceHealthStatusMessage"
// owner: @yuanwang04
// kep: https://kep.k8s.io/5532
//
@ -1708,6 +1714,11 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
ResourceHealthStatus: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
ResourceHealthStatusMessage: {
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
RestartAllContainersOnContainerExits: {
@ -2400,6 +2411,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
ResourceHealthStatus: {DynamicResourceAllocation},
ResourceHealthStatusMessage: {ResourceHealthStatus},
// RestartAllContainersOnContainerExits introduces a new container restart rule action.
// All restart rules will be dropped by API if ContainerRestartRules feature is not enabled.
RestartAllContainersOnContainerExits: {ContainerRestartRules, NodeDeclaredFeatures},

View file

@ -30200,6 +30200,13 @@ func schema_k8sio_api_core_v1_ResourceHealth(ref common.ReferenceCallback) commo
Format: "",
},
},
"message": {
SchemaProps: spec.SchemaProps{
Description: "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.",
Type: []string{"string"},
Format: "",
},
},
},
Required: []string{"resourceID"},
},

View file

@ -123,8 +123,13 @@ func (cache *healthInfoCache) saveToCheckpointInternal(logger klog.Logger) error
}
// getHealthInfo returns the current health info, adjusting for timeouts.
func (cache *healthInfoCache) getHealthInfo(driverName, poolName, deviceName string) state.DeviceHealthStatus {
res := state.DeviceHealthStatusUnknown
func (cache *healthInfoCache) getHealthInfo(driverName, poolName, deviceName string) state.DeviceHealth {
res := state.DeviceHealth{
PoolName: poolName,
DeviceName: deviceName,
Health: state.DeviceHealthStatusUnknown,
Message: "",
}
_ = cache.withRLock(func() error {
now := time.Now()
@ -139,9 +144,11 @@ func (cache *healthInfoCache) getHealthInfo(driverName, poolName, deviceName str
// Check if device health has timed out
if now.Sub(device.LastUpdated) > timeout {
res = state.DeviceHealthStatusUnknown
// Keep default Unknown status, clear message for stale device
res.Health = state.DeviceHealthStatusUnknown
res.Message = ""
} else {
res = device.Health
res = device
}
}
}
@ -176,7 +183,8 @@ func (cache *healthInfoCache) updateHealthInfo(logger klog.Logger, driverName st
existingDevice, ok := currentDriver.Devices[key]
if !ok || existingDevice.Health != reportedDevice.Health || existingDevice.HealthCheckTimeout != reportedDevice.HealthCheckTimeout {
// Consider health status, message, and timeout changes as updates
if !ok || existingDevice.Health != reportedDevice.Health || existingDevice.Message != reportedDevice.Message || existingDevice.HealthCheckTimeout != reportedDevice.HealthCheckTimeout {
changedDevices = append(changedDevices, reportedDevice)
}
@ -197,6 +205,7 @@ func (cache *healthInfoCache) updateHealthInfo(logger klog.Logger, driverName st
// Mark as unknown if the device health has timed out
if existingDevice.Health != state.DeviceHealthStatusUnknown && now.Sub(existingDevice.LastUpdated) > timeout {
existingDevice.Health = state.DeviceHealthStatusUnknown
existingDevice.Message = ""
existingDevice.LastUpdated = now
currentDriver.Devices[key] = existingDevice

View file

@ -200,12 +200,12 @@ func TestGetHealthInfo(t *testing.T) {
require.NoError(t, err)
// Initial state
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
// Add a device
_, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth})
require.NoError(t, err)
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
// Test timeout (simulated with old LastUpdated)
err = cache.withLock(func() error {
@ -218,7 +218,7 @@ func TestGetHealthInfo(t *testing.T) {
return nil
})
require.NoError(t, err)
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
}
// TestGetHealthInfoRobust tests retrieving health status logic solely & against many cases.
@ -353,7 +353,7 @@ func TestGetHealthInfoRobust(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := &healthInfoCache{HealthInfo: tt.initialState}
health := cache.getHealthInfo(tt.driverName, tt.poolName, tt.deviceName)
health := cache.getHealthInfo(tt.driverName, tt.poolName, tt.deviceName).Health
assert.Equal(t, tt.expectedHealth, health)
})
}
@ -372,7 +372,7 @@ func TestUpdateHealthInfo(t *testing.T) {
changedDevices, err := cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth})
require.NoError(t, err)
assertDeviceHealthElementsMatchIgnoreTime(t, expectedChanged1, changedDevices)
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
// 2 -- Update with no change
changedDevices, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth})
@ -386,7 +386,7 @@ func TestUpdateHealthInfo(t *testing.T) {
changedDevices, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{newHealth})
require.NoError(t, err)
assertDeviceHealthElementsMatchIgnoreTime(t, expectedChanged3, changedDevices)
assert.Equal(t, state.DeviceHealthStatusUnhealthy, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusUnhealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
// 4 -- Add second device, omit first
secondDevice := state.DeviceHealth{PoolName: testPool, DeviceName: "device2", Health: state.DeviceHealthStatusHealthy, HealthCheckTimeout: DefaultHealthTimeout}
@ -408,14 +408,14 @@ func TestUpdateHealthInfo(t *testing.T) {
changedDevices, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{secondDevice})
require.NoError(t, err)
assertDeviceHealthElementsMatchIgnoreTime(t, expectedChanged4, changedDevices)
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, "device2"))
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, "device2").Health)
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
// 5 -- Test persistence
cache2, err := newHealthInfoCache(logger, tmpFile)
require.NoError(t, err)
assert.Equal(t, state.DeviceHealthStatusHealthy, cache2.getHealthInfo(testDriver, testPool, "device2"))
assert.Equal(t, state.DeviceHealthStatusUnknown, cache2.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusHealthy, cache2.getHealthInfo(testDriver, testPool, "device2").Health)
assert.Equal(t, state.DeviceHealthStatusUnknown, cache2.getHealthInfo(testDriver, testPool, testDevice).Health)
// 6 -- Test how updateHealthInfo handles device timeouts
timeoutDevice := state.DeviceHealth{PoolName: testPool, DeviceName: "timeoutDevice", Health: "Unhealthy", HealthCheckTimeout: DefaultHealthTimeout}
@ -453,9 +453,9 @@ func TestClearDriver(t *testing.T) {
_, err = cache.updateHealthInfo(logger, testDriver, []state.DeviceHealth{testDeviceHealth})
require.NoError(t, err)
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusHealthy, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
err = cache.clearDriver(logger, testDriver)
require.NoError(t, err)
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice))
assert.Equal(t, state.DeviceHealthStatusUnknown, cache.getHealthInfo(testDriver, testPool, testDevice).Health)
}

View file

@ -22,6 +22,7 @@ import (
"fmt"
"io"
"path/filepath"
"slices"
"strconv"
"time"
@ -473,6 +474,16 @@ func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
return nil
}
// truncateHealthMessage truncates a health message to the maximum allowed length.
// If the message exceeds v1.ResourceHealthMessageMaxLength, it is truncated to
// (maxLength - 3) characters and "..." is appended.
func truncateHealthMessage(message string) string {
if len(message) <= v1.ResourceHealthMessageMaxLength {
return message
}
return message[:v1.ResourceHealthMessageMaxLength-3] + "..."
}
// GetResources gets a ContainerInfo object from the claimInfo cache.
// This information is used by the caller to update a container config.
func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
@ -830,10 +841,26 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat
// Iterate through the claims requested by this specific container.
for _, claim := range containerSpec.Resources.Claims {
// Find the actual name of the ResourceClaim object.
// Try the O(1) map lookup first, then fall back to resolving
// from the pod spec. The fallback is needed because
// ResourceClaimStatuses may not be populated yet when a
// health update triggers an early status sync.
actualClaimName := podClaimStatusMap[claim.Name]
if actualClaimName == "" {
logger.V(4).Info("Could not find generated name for resource claim in pod status", "container", containerSpec.Name, "claimName", claim.Name)
for i := range pod.Spec.ResourceClaims {
if pod.Spec.ResourceClaims[i].Name == claim.Name {
claimNamePtr, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
if err == nil && claimNamePtr != nil {
actualClaimName = *claimNamePtr
}
break
}
}
}
if actualClaimName == "" {
logger.V(4).Info("Could not find generated name for resource claim", "container", containerSpec.Name, "claimName", claim.Name)
continue
}
@ -871,26 +898,53 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat
// Clear previous health entries before adding current ones.
resStatus.Resources = []v1.ResourceHealth{}
// Use a map to track resourceIDs we've already added to avoid duplicates.
// Multiple devices in claimInfo.DriverState may have the same resourceID
// (e.g., when using the same CDI device ID), so we need to deduplicate.
seenResourceIDs := make(map[v1.ResourceID]bool)
for driverName, driverState := range claimInfo.DriverState {
for _, device := range driverState.Devices {
healthStr := m.healthInfoCache.getHealthInfo(driverName, device.PoolName, device.DeviceName)
// Skip devices that don't match the request we're reporting for.
// Use BaseRequestRef to handle subrequests (e.g., "request/subrequest" -> "request")
// since device.RequestNames are stored with subrequest suffixes stripped.
if claim.Request != "" && len(device.RequestNames) > 0 && !slices.Contains(device.RequestNames, resourceclaim.BaseRequestRef(claim.Request)) {
continue
}
healthInfo := m.healthInfoCache.getHealthInfo(driverName, device.PoolName, device.DeviceName)
var health v1.ResourceHealthStatus
switch healthStr {
case "Healthy":
switch healthInfo.Health {
case state.DeviceHealthStatusHealthy:
health = v1.ResourceHealthStatusHealthy
case "Unhealthy":
case state.DeviceHealthStatusUnhealthy:
health = v1.ResourceHealthStatusUnhealthy
default:
health = v1.ResourceHealthStatusUnknown
}
resourceHealth := v1.ResourceHealth{Health: health}
// Create the ResourceHealth entry
resourceHealth := v1.ResourceHealth{
Health: health,
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ResourceHealthStatusMessage) && len(healthInfo.Message) > 0 {
resourceHealth.Message = &healthInfo.Message
}
// Use first CDI device ID as ResourceID, with fallback
if len(device.CDIDeviceIDs) > 0 {
resourceHealth.ResourceID = v1.ResourceID(device.CDIDeviceIDs[0])
} else {
resourceHealth.ResourceID = v1.ResourceID(fmt.Sprintf("%s/%s/%s", driverName, device.PoolName, device.DeviceName))
}
// Skip if we've already added this resourceID
if seenResourceIDs[resourceHealth.ResourceID] {
continue
}
seenResourceIDs[resourceHealth.ResourceID] = true
resStatus.Resources = append(resStatus.Resources, resourceHealth)
}
}
@ -973,6 +1027,7 @@ func (m *Manager) HandleWatchResourcesStream(ctx context.Context, stream draheal
Health: health,
LastUpdated: time.Unix(d.GetLastUpdatedTime(), 0),
HealthCheckTimeout: timeout,
Message: truncateHealthMessage(d.GetMessage()),
}
}

View file

@ -50,6 +50,7 @@ import (
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/utils/ptr"
drahealthv1alpha1 "k8s.io/kubelet/pkg/apis/dra-health/v1alpha1"
drapb "k8s.io/kubelet/pkg/apis/dra/v1"
@ -57,7 +58,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
const (
@ -1746,7 +1746,7 @@ func TestHandleWatchResourcesStream(t *testing.T) {
}
// Check cache state
cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName)
cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName).Health
assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Cache update check failed")
t.Log("HealthChangeForAllocatedDevice: Closing responses channel to signal EOF")
@ -1804,7 +1804,7 @@ func TestHandleWatchResourcesStream(t *testing.T) {
}
// Check health cache for the "other-device"
cachedHealthOther := manager.healthInfoCache.getHealthInfo(driverName, poolName, "other-device")
cachedHealthOther := manager.healthInfoCache.getHealthInfo(driverName, poolName, "other-device").Health
assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealthOther, "Cache update for other-device failed")
close(responses)
@ -1989,7 +1989,7 @@ func TestHandleWatchResourcesStream(t *testing.T) {
// Verify health cache is updated correctly
cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName)
assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Health cache should be updated for device with ShareID")
assert.Equal(t, state.DeviceHealthStatusUnhealthy, cachedHealth.Health, "Health cache should be updated for device with ShareID")
// Verify the claim still has the ShareID set (it shouldn't be lost during health updates)
claimFromCache, exists := manager.cache.get(claimName, namespace)
@ -2112,7 +2112,7 @@ func TestHandleWatchResourcesStream(t *testing.T) {
// Verify health cache is updated
cachedHealth := manager.healthInfoCache.getHealthInfo(driverName, poolName, deviceName)
assert.Equal(t, state.DeviceHealthStatus("Unhealthy"), cachedHealth, "Health cache should show Unhealthy for shared device")
assert.Equal(t, state.DeviceHealthStatusUnhealthy, cachedHealth.Health, "Health cache should show Unhealthy for shared device")
t.Log("HealthChangeForMultiplePodsWithSharedDevice: Closing responses channel to signal EOF")
close(responses)
@ -2184,7 +2184,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
{
Name: "claim:claim1",
Resources: []v1.ResourceHealth{
{ResourceID: "test-driver/pool/dev-a", Health: v1.ResourceHealthStatusHealthy},
{ResourceID: "test-driver/pool/dev-a", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")},
},
},
},
@ -2219,7 +2219,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
{
Name: "claim:renamed-pod-claim",
Resources: []v1.ResourceHealth{
{ResourceID: "test-driver/pool/dev-b", Health: v1.ResourceHealthStatusHealthy},
{ResourceID: "test-driver/pool/dev-b", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")},
},
},
},
@ -2254,7 +2254,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
{
Name: "claim:templated-claim",
Resources: []v1.ResourceHealth{
{ResourceID: "test-driver/pool/dev-c", Health: v1.ResourceHealthStatusHealthy},
{ResourceID: "test-driver/pool/dev-c", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")},
},
},
},
@ -2299,7 +2299,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
{
Name: "claim:shareid-claim",
Resources: []v1.ResourceHealth{
{ResourceID: "test-driver/pool/dev-shared", Health: v1.ResourceHealthStatusHealthy},
{ResourceID: "test-driver/pool/dev-shared", Health: v1.ResourceHealthStatusHealthy, Message: ptr.To("Device is operating normally")},
},
},
},
@ -2308,6 +2308,7 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatusMessage, true)
tCtx := ktesting.Init(t)
logger := tCtx.Logger()
manager, err := NewManager(logger, nil, t.TempDir())
@ -2322,7 +2323,12 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
for _, ci := range tc.claimInfos {
for driverName, ds := range ci.DriverState {
for _, dev := range ds.Devices {
devices = append(devices, state.DeviceHealth{PoolName: dev.PoolName, DeviceName: dev.DeviceName, Health: state.DeviceHealthStatusHealthy})
devices = append(devices, state.DeviceHealth{
PoolName: dev.PoolName,
DeviceName: dev.DeviceName,
Health: state.DeviceHealthStatusHealthy,
Message: "Device is operating normally",
})
}
_, err := manager.healthInfoCache.updateHealthInfo(logger, driverName, devices)
require.NoError(t, err)
@ -2337,3 +2343,194 @@ func TestUpdateAllocatedResourcesStatus(t *testing.T) {
})
}
}
func TestUpdateAllocatedResourcesStatus_Subrequest(t *testing.T) {
directClaimName := "test-claim"
testCases := []struct {
name string
claimRequest string
deviceRequestNames []string
expectHealthReported bool
expectedHealthMessage string
expectedHealthStatus v1.ResourceHealthStatus
}{
{
name: "Subrequest matches base name",
claimRequest: "req-base/sub",
deviceRequestNames: []string{"req-base"},
expectHealthReported: true,
expectedHealthMessage: "Device healthy",
expectedHealthStatus: v1.ResourceHealthStatusHealthy,
},
{
name: "Subrequest does not match different base name",
claimRequest: "req-other/sub",
deviceRequestNames: []string{"req-base"},
expectHealthReported: false,
},
{
name: "Base request matches base name",
claimRequest: "req-base",
deviceRequestNames: []string{"req-base"},
expectHealthReported: true,
expectedHealthMessage: "Device healthy",
expectedHealthStatus: v1.ResourceHealthStatusHealthy,
},
{
name: "Empty request matches any device",
claimRequest: "",
deviceRequestNames: []string{"req-base"},
expectHealthReported: true,
expectedHealthMessage: "Device healthy",
expectedHealthStatus: v1.ResourceHealthStatusHealthy,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatusMessage, true)
tCtx := ktesting.Init(t)
manager, err := NewManager(tCtx.Logger(), nil, t.TempDir())
require.NoError(t, err)
// Setup claim info with device
claimInfo := &ClaimInfo{
ClaimInfoState: state.ClaimInfoState{
ClaimName: directClaimName,
PodUIDs: sets.New("test-pod-uid"),
DriverState: map[string]state.DriverState{
"test-driver": {
Devices: []state.Device{
{
PoolName: "test-pool",
DeviceName: "test-device",
RequestNames: tc.deviceRequestNames,
},
},
},
},
},
}
manager.cache.add(claimInfo)
// Setup health cache
devices := []state.DeviceHealth{
{
PoolName: "test-pool",
DeviceName: "test-device",
Health: state.DeviceHealthStatusHealthy,
Message: "Device healthy",
},
}
_, err = manager.healthInfoCache.updateHealthInfo(tCtx.Logger(), "test-driver", devices)
require.NoError(t, err)
// Create pod and status
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "test-pod", UID: "test-pod-uid"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container1",
Resources: v1.ResourceRequirements{
Claims: []v1.ResourceClaim{
{Name: "claim1", Request: tc.claimRequest},
},
},
},
},
ResourceClaims: []v1.PodResourceClaim{
{Name: "claim1", ResourceClaimName: &directClaimName},
},
},
Status: v1.PodStatus{
ResourceClaimStatuses: []v1.PodResourceClaimStatus{
{Name: "claim1", ResourceClaimName: &directClaimName},
},
},
}
status := &v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{Name: "container1"},
},
}
// Call UpdateAllocatedResourcesStatus
manager.UpdateAllocatedResourcesStatus(pod, status)
// Assert results
require.Len(t, status.ContainerStatuses, 1)
if tc.expectHealthReported {
// Should have one ResourceStatus entry
require.Len(t, status.ContainerStatuses[0].AllocatedResourcesStatus, 1)
resStatus := status.ContainerStatuses[0].AllocatedResourcesStatus[0]
// Should have one ResourceHealth entry
require.Len(t, resStatus.Resources, 1, "Expected exactly one ResourceHealth entry")
healthEntry := resStatus.Resources[0]
// Verify health matches what we put in the cache
assert.Equal(t, tc.expectedHealthStatus, healthEntry.Health)
assert.Equal(t, tc.expectedHealthMessage, ptr.Deref(healthEntry.Message, ""))
assert.Equal(t, v1.ResourceID("test-driver/test-pool/test-device"), healthEntry.ResourceID)
} else {
// Should have no ResourceStatus entries at all (empty statuses are pruned)
assert.Empty(t, status.ContainerStatuses[0].AllocatedResourcesStatus,
"Expected AllocatedResourcesStatus to be empty when device is filtered out")
}
})
}
}
func TestTruncateHealthMessage(t *testing.T) {
testCases := map[string]struct {
input string
expected string
}{
"empty message": {
input: "",
expected: "",
},
"short message": {
input: "Device is healthy",
expected: "Device is healthy",
},
"message at exactly max length": {
input: string(make([]byte, v1.ResourceHealthMessageMaxLength)),
expected: string(make([]byte, v1.ResourceHealthMessageMaxLength)),
},
"message one character over max length": {
input: string(make([]byte, v1.ResourceHealthMessageMaxLength+1)),
expected: string(make([]byte, v1.ResourceHealthMessageMaxLength-3)) + "...",
},
"very long message": {
input: string(make([]byte, v1.ResourceHealthMessageMaxLength*2)),
expected: string(make([]byte, v1.ResourceHealthMessageMaxLength-3)) + "...",
},
"message with meaningful content": {
input: "ECC error count exceeded threshold on device GPU-0",
expected: "ECC error count exceeded threshold on device GPU-0",
},
"long message with meaningful content": {
input: string(make([]byte, v1.ResourceHealthMessageMaxLength)) + " extra content",
expected: string(make([]byte, v1.ResourceHealthMessageMaxLength-3)) + "...",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := truncateHealthMessage(tc.input)
assert.Equal(t, tc.expected, result)
// Verify the result is never longer than max length
assert.LessOrEqual(t, len(result), v1.ResourceHealthMessageMaxLength)
// Verify that if truncation occurred, it ends with "..."
if len(tc.input) > v1.ResourceHealthMessageMaxLength {
assert.Len(t, result, v1.ResourceHealthMessageMaxLength)
assert.Equal(t, "...", result[len(result)-3:])
}
})
}
}

View file

@ -384,7 +384,7 @@ func (pm *DRAPluginManager) add(driverName string, endpoint string, chosenServic
}
p.conn = conn
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) && pm.streamHandler != nil {
pm.wg.Add(1)
go func() {
defer pm.wg.Done()

View file

@ -101,4 +101,8 @@ type DeviceHealth struct {
// Zero value means use the default timeout (DefaultHealthTimeout).
// This ensures backward compatibility with existing data.
HealthCheckTimeout time.Duration
// Message provides additional details about the device's health status.
// This field is optional and may be empty.
Message string
}

View file

@ -11564,6 +11564,13 @@ func (m *ResourceHealth) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Message != nil {
i -= len(*m.Message)
copy(dAtA[i:], *m.Message)
i = encodeVarintGenerated(dAtA, i, uint64(len(*m.Message)))
i--
dAtA[i] = 0x32
}
i -= len(m.Health)
copy(dAtA[i:], m.Health)
i = encodeVarintGenerated(dAtA, i, uint64(len(m.Health)))
@ -19140,6 +19147,10 @@ func (m *ResourceHealth) Size() (n int) {
n += 1 + l + sovGenerated(uint64(l))
l = len(m.Health)
n += 1 + l + sovGenerated(uint64(l))
if m.Message != nil {
l = len(*m.Message)
n += 1 + l + sovGenerated(uint64(l))
}
return n
}
@ -23478,6 +23489,7 @@ func (this *ResourceHealth) String() string {
s := strings.Join([]string{`&ResourceHealth{`,
`ResourceID:` + fmt.Sprintf("%v", this.ResourceID) + `,`,
`Health:` + fmt.Sprintf("%v", this.Health) + `,`,
`Message:` + valueToStringGenerated(this.Message) + `,`,
`}`,
}, "")
return s
@ -59695,6 +59707,39 @@ func (m *ResourceHealth) Unmarshal(dAtA []byte) error {
}
m.Health = ResourceHealthStatus(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGenerated
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthGenerated
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthGenerated
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
s := string(dAtA[iNdEx:postIndex])
m.Message = &s
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipGenerated(dAtA[iNdEx:])

View file

@ -5473,6 +5473,12 @@ message ResourceHealth {
//
// In future we may want to introduce the PermanentlyUnhealthy Status.
optional string health = 2;
// Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold").
// This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.
// +featureGate=ResourceHealthStatusMessage
// +optional
optional string message = 6;
}
// ResourceQuota sets aggregate quota restrictions enforced per namespace

View file

@ -3418,6 +3418,10 @@ const (
ResourceHealthStatusUnknown ResourceHealthStatus = "Unknown"
)
// ResourceHealthMessageMaxLength is the maximum length for ResourceHealth.Message field.
// Messages longer than this will be truncated with "..." appended.
const ResourceHealthMessageMaxLength = 1024
// ResourceID is calculated based on the source of this resource health information.
// For DevicePlugin:
//
@ -3445,6 +3449,11 @@ type ResourceHealth struct {
//
// In future we may want to introduce the PermanentlyUnhealthy Status.
Health ResourceHealthStatus `json:"health,omitempty" protobuf:"bytes,2,name=health"`
// Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold").
// This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.
// +featureGate=ResourceHealthStatusMessage
// +optional
Message *string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"`
}
// ContainerUser represents user identity information

View file

@ -2239,6 +2239,7 @@ var map_ResourceHealth = map[string]string{
"": "ResourceHealth represents the health of a resource. It has the latest device health information. This is a part of KEP https://kep.k8s.io/4680.",
"resourceID": "ResourceID is the unique identifier of the resource. See the ResourceID type for more information.",
"health": "Health of the resource. can be one of:\n - Healthy: operates as normal\n - Unhealthy: reported unhealthy. We consider this a temporary health issue\n since we do not have a mechanism today to distinguish\n temporary and permanent issues.\n - Unknown: The status cannot be determined.\n For example, Device Plugin got unregistered and hasn't been re-registered since.\n\nIn future we may want to introduce the PermanentlyUnhealthy Status.",
"message": "Message provides human-readable context for Health (e.g. \"ECC error count exceeded threshold\"). This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.",
}
func (ResourceHealth) SwaggerDoc() map[string]string {

View file

@ -5238,6 +5238,11 @@ func (in *ResourceFieldSelector) DeepCopy() *ResourceFieldSelector {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceHealth) DeepCopyInto(out *ResourceHealth) {
*out = *in
if in.Message != nil {
in, out := &in.Message, &out.Message
*out = new(string)
**out = **in
}
return
}
@ -5438,7 +5443,9 @@ func (in *ResourceStatus) DeepCopyInto(out *ResourceStatus) {
if in.Resources != nil {
in, out := &in.Resources, &out.Resources
*out = make([]ResourceHealth, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}

View file

@ -1915,7 +1915,8 @@
"resources": [
{
"resourceID": "resourceIDValue",
"health": "healthValue"
"health": "healthValue",
"message": "messageValue"
}
]
}
@ -2013,7 +2014,8 @@
"resources": [
{
"resourceID": "resourceIDValue",
"health": "healthValue"
"health": "healthValue",
"message": "messageValue"
}
]
}
@ -2112,7 +2114,8 @@
"resources": [
{
"resourceID": "resourceIDValue",
"health": "healthValue"
"health": "healthValue",
"message": "messageValue"
}
]
}

View file

@ -1247,6 +1247,7 @@ status:
- name: nameValue
resources:
- health: healthValue
message: messageValue
resourceID: resourceIDValue
containerID: containerIDValue
image: imageValue
@ -1313,6 +1314,7 @@ status:
- name: nameValue
resources:
- health: healthValue
message: messageValue
resourceID: resourceIDValue
containerID: containerIDValue
image: imageValue
@ -1388,6 +1390,7 @@ status:
- name: nameValue
resources:
- health: healthValue
message: messageValue
resourceID: resourceIDValue
containerID: containerIDValue
image: imageValue

View file

@ -163,7 +163,8 @@
"resources": [
{
"resourceID": "resourceIDValue",
"health": "healthValue"
"health": "healthValue",
"message": "messageValue"
}
]
}
@ -261,7 +262,8 @@
"resources": [
{
"resourceID": "resourceIDValue",
"health": "healthValue"
"health": "healthValue",
"message": "messageValue"
}
]
}
@ -360,7 +362,8 @@
"resources": [
{
"resourceID": "resourceIDValue",
"health": "healthValue"
"health": "healthValue",
"message": "messageValue"
}
]
}

View file

@ -50,6 +50,7 @@ status:
- name: nameValue
resources:
- health: healthValue
message: messageValue
resourceID: resourceIDValue
containerID: containerIDValue
image: imageValue
@ -116,6 +117,7 @@ status:
- name: nameValue
resources:
- health: healthValue
message: messageValue
resourceID: resourceIDValue
containerID: containerIDValue
image: imageValue
@ -191,6 +193,7 @@ status:
- name: nameValue
resources:
- health: healthValue
message: messageValue
resourceID: resourceIDValue
containerID: containerIDValue
image: imageValue

View file

@ -41,6 +41,9 @@ type ResourceHealthApplyConfiguration struct {
//
// In future we may want to introduce the PermanentlyUnhealthy Status.
Health *corev1.ResourceHealthStatus `json:"health,omitempty"`
// Message provides human-readable context for Health (e.g. "ECC error count exceeded threshold").
// This field is populated by the kubelet when ResourceHealthStatusMessage is enabled if the DRA plugin returns a message, and is null otherwise.
Message *string `json:"message,omitempty"`
}
// ResourceHealthApplyConfiguration constructs a declarative configuration of the ResourceHealth type for use with
@ -64,3 +67,11 @@ func (b *ResourceHealthApplyConfiguration) WithHealth(value corev1.ResourceHealt
b.Health = &value
return b
}
// WithMessage sets the Message field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the Message field is set to the value of the last call.
func (b *ResourceHealthApplyConfiguration) WithMessage(value string) *ResourceHealthApplyConfiguration {
b.Message = &value
return b
}

View file

@ -7519,6 +7519,9 @@ var schemaYAML = typed.YAMLObject(`types:
- name: health
type:
scalar: string
- name: message
type:
scalar: string
- name: resourceID
type:
scalar: string

View file

@ -208,8 +208,12 @@ type DeviceHealth struct {
// If not specified, zero, or negative, Kubelet will use a default timeout.
// Negative values will be logged and treated as if not specified.
HealthCheckTimeoutSeconds int64 `protobuf:"varint,4,opt,name=health_check_timeout_seconds,json=healthCheckTimeoutSeconds,proto3" json:"health_check_timeout_seconds,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
// An optional message providing additional details about the device's health.
// Maximum length is 1024 characters. Messages longer than 1024 characters
// will be truncated to 1021 characters with "..." appended.
Message string `protobuf:"bytes,5,opt,name=message,proto3" json:"message,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DeviceHealth) Reset() {
@ -270,6 +274,13 @@ func (x *DeviceHealth) GetHealthCheckTimeoutSeconds() int64 {
return 0
}
func (x *DeviceHealth) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
// NodeWatchResourcesResponse contains a list of devices and their current health.
// This should be a complete list for the driver; Kubelet will reconcile this
// state with its internal cache. Any devices managed by the driver that are
@ -332,7 +343,7 @@ var file_staging_src_k8s_io_kubelet_pkg_apis_dra_health_v1alpha1_api_proto_rawDe
0x0a, 0x09, 0x70, 0x6f, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x70, 0x6f, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x64,
0x65, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xdf, 0x01, 0x0a,
0x52, 0x0a, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xf9, 0x01, 0x0a,
0x0c, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x32, 0x0a,
0x06, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x49,
@ -346,27 +357,29 @@ var file_staging_src_k8s_io_kubelet_pkg_apis_dra_health_v1alpha1_api_proto_rawDe
0x1c, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x5f, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x04, 0x20,
0x01, 0x28, 0x03, 0x52, 0x19, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x22, 0x4e,
0x0a, 0x1a, 0x4e, 0x6f, 0x64, 0x65, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75,
0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x07,
0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e,
0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x48,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2a, 0x37,
0x0a, 0x0c, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b,
0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x48,
0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, 0x48, 0x45,
0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x02, 0x32, 0x78, 0x0a, 0x11, 0x44, 0x52, 0x41, 0x52, 0x65,
0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x63, 0x0a, 0x12,
0x4e, 0x6f, 0x64, 0x65, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
0x65, 0x73, 0x12, 0x23, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4e, 0x6f,
0x64, 0x65, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68,
0x61, 0x31, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f,
0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30,
0x01, 0x42, 0x2d, 0x5a, 0x2b, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x75, 0x62, 0x65,
0x6c, 0x65, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x64, 0x72, 0x61,
0x2d, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x18,
0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x4e, 0x0a, 0x1a, 0x4e, 0x6f, 0x64, 0x65,
0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65,
0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68,
0x61, 0x31, 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52,
0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2a, 0x37, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x6c,
0x74, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e,
0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59,
0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10,
0x02, 0x32, 0x78, 0x0a, 0x11, 0x44, 0x52, 0x41, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65,
0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x63, 0x0a, 0x12, 0x4e, 0x6f, 0x64, 0x65, 0x57, 0x61,
0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x23, 0x2e, 0x76,
0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x57, 0x61, 0x74, 0x63,
0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x24, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4e, 0x6f, 0x64,
0x65, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x2d, 0x5a, 0x2b, 0x6b,
0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x6c, 0x65, 0x74, 0x2f, 0x70, 0x6b,
0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x64, 0x72, 0x61, 0x2d, 0x68, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
})
var (

View file

@ -61,6 +61,11 @@ message DeviceHealth {
// If not specified, zero, or negative, Kubelet will use a default timeout.
// Negative values will be logged and treated as if not specified.
int64 health_check_timeout_seconds = 4;
// An optional message providing additional details about the device's health.
// Maximum length is 1024 characters. Messages longer than 1024 characters
// will be truncated to 1021 characters with "..." appended.
string message = 5;
}
// NodeWatchResourcesResponse contains a list of devices and their current health.

View file

@ -157,7 +157,8 @@
| ReloadKubeletServerCertificateFile | :ballot_box_with_check:&nbsp;1.31+ | | | 1.31 | | | | [code](https://cs.k8s.io/?q=%5CbReloadKubeletServerCertificateFile%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbReloadKubeletServerCertificateFile%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| RemoteRequestHeaderUID | :ballot_box_with_check:&nbsp;1.33+ | | 1.32 | 1.33 | | | | [code](https://cs.k8s.io/?q=%5CbRemoteRequestHeaderUID%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRemoteRequestHeaderUID%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| ResilientWatchCacheInitialization | :ballot_box_with_check:&nbsp;1.31+ | :closed_lock_with_key:&nbsp;1.34+ | | 1.311.33 | 1.34 | | | [code](https://cs.k8s.io/?q=%5CbResilientWatchCacheInitialization%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbResilientWatchCacheInitialization%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| ResourceHealthStatus | | | 1.31 | | | | DynamicResourceAllocation | [code](https://cs.k8s.io/?q=%5CbResourceHealthStatus%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbResourceHealthStatus%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| ResourceHealthStatus | :ballot_box_with_check:&nbsp;1.36+ | | 1.311.35 | 1.36 | | | DynamicResourceAllocation | [code](https://cs.k8s.io/?q=%5CbResourceHealthStatus%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbResourceHealthStatus%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| ResourceHealthStatusMessage | :ballot_box_with_check:&nbsp;1.36+ | | | 1.36 | | | ResourceHealthStatus | [code](https://cs.k8s.io/?q=%5CbResourceHealthStatusMessage%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbResourceHealthStatusMessage%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| RestartAllContainersOnContainerExits | | | 1.35 | | | | ContainerRestartRules<br>NodeDeclaredFeatures | [code](https://cs.k8s.io/?q=%5CbRestartAllContainersOnContainerExits%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRestartAllContainersOnContainerExits%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| RetryGenerateName | :ballot_box_with_check:&nbsp;1.31+ | :closed_lock_with_key:&nbsp;1.32+ | 1.30 | 1.31 | 1.32 | | | [code](https://cs.k8s.io/?q=%5CbRetryGenerateName%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRetryGenerateName%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| RotateKubeletServerCertificate | :ballot_box_with_check:&nbsp;1.12+ | | 1.71.11 | 1.12 | | | | [code](https://cs.k8s.io/?q=%5CbRotateKubeletServerCertificate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbRotateKubeletServerCertificate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |

View file

@ -1513,6 +1513,16 @@
lockToDefault: false
preRelease: Alpha
version: "1.31"
- default: true
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: ResourceHealthStatusMessage
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: RestartAllContainersOnContainerExits
versionedSpecs:
- default: false

View file

@ -3135,4 +3135,191 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() {
multipleDriversContext("using only drapbv1", false, true)
multipleDriversContext("using drapbv1beta1 and drapbv1", true, true)
})
// Test for device health reporting with custom messages
framework.Context("kubelet", feature.DynamicResourceAllocation, func() {
nodes := drautils.NewNodes(f, 1, 1)
driver := drautils.NewDriver(f, nodes, drautils.NetworkResources(10, false))
b := drautils.NewBuilder(f, driver)
f.It("should report device health with custom messages", f.WithLabel("KubeletMinVersion:1.36"), framework.WithFeatureGate(features.ResourceHealthStatusMessage), func(ctx context.Context) {
claimName := "health-test-claim"
claim := b.ExternalClaim()
claim.Name = claimName
pod := b.PodExternal(claimName)
pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name
b.Create(f.TContext(ctx), claim, pod)
// Wait for pod to start running
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod")
// Get the pool name and device name from the allocated claim
allocatedClaim, err := f.ClientSet.ResourceV1().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(allocatedClaim.Status.Allocation).ToNot(gomega.BeNil())
gomega.Expect(allocatedClaim.Status.Allocation.Devices.Results).To(gomega.HaveLen(1))
poolName := allocatedClaim.Status.Allocation.Devices.Results[0].Pool
deviceName := allocatedClaim.Status.Allocation.Devices.Results[0].Device
// Get the plugin for the node where the pod is scheduled
scheduledPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
plugin, ok := driver.Nodes[scheduledPod.Spec.NodeName]
if !ok {
framework.Failf("pod got scheduled to node %s without a plugin", scheduledPod.Spec.NodeName)
}
// Test 1: Set and check initial healthy status with message
ginkgo.By("Setting initial healthy status with message")
plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Healthy",
Message: "Device operating normally, temperature: 45°C",
}
ginkgo.By("Checking initial healthy status with message")
gomega.Eventually(ctx, func(ctx context.Context) string {
updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return ""
}
for _, cs := range updatedPod.Status.ContainerStatuses {
for _, rs := range cs.AllocatedResourcesStatus {
if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) {
for _, rh := range rs.Resources {
return ptr.Deref(rh.Message, "")
}
}
}
}
return ""
}).WithTimeout(30 * time.Second).Should(gomega.Equal("Device operating normally, temperature: 45°C"))
// Test 2: Mark device as unhealthy with error message
ginkgo.By("Marking device as unhealthy with error message")
plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Unhealthy",
Message: "ECC error count exceeded threshold (15 errors in last hour)",
}
// First Eventually: Check health status
gomega.Eventually(ctx, func(ctx context.Context) v1.ResourceHealthStatus {
updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return ""
}
for _, cs := range updatedPod.Status.ContainerStatuses {
for _, rs := range cs.AllocatedResourcesStatus {
if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) {
for _, rh := range rs.Resources {
return rh.Health
}
}
}
}
return ""
}).WithTimeout(30 * time.Second).Should(gomega.Equal(v1.ResourceHealthStatusUnhealthy))
// Second Eventually: Check message
gomega.Eventually(ctx, func(ctx context.Context) string {
updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return ""
}
for _, cs := range updatedPod.Status.ContainerStatuses {
for _, rs := range cs.AllocatedResourcesStatus {
if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) {
for _, rh := range rs.Resources {
return ptr.Deref(rh.Message, "")
}
}
}
}
return ""
}).WithTimeout(30 * time.Second).Should(gomega.Equal("ECC error count exceeded threshold (15 errors in last hour)"))
// Test 3: Update message while still unhealthy
ginkgo.By("Updating message while device remains unhealthy")
plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Unhealthy",
Message: "Critical: Memory corruption detected",
}
gomega.Eventually(ctx, func(ctx context.Context) string {
updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return ""
}
for _, cs := range updatedPod.Status.ContainerStatuses {
for _, rs := range cs.AllocatedResourcesStatus {
if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) {
for _, rh := range rs.Resources {
if rh.Health == v1.ResourceHealthStatusUnhealthy {
return ptr.Deref(rh.Message, "")
}
}
}
}
}
return ""
}).WithTimeout(30 * time.Second).Should(gomega.Equal("Critical: Memory corruption detected"))
// Test 4: Mark device as healthy with recovery message
ginkgo.By("Marking device as healthy with recovery message")
plugin.HealthControlChan <- testdriverapp.DeviceHealthUpdate{
PoolName: poolName,
DeviceName: deviceName,
Health: "Healthy",
Message: "Recovered after reset, all diagnostics passed",
}
// First Eventually: Check health status
gomega.Eventually(ctx, func(ctx context.Context) v1.ResourceHealthStatus {
updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return ""
}
for _, cs := range updatedPod.Status.ContainerStatuses {
for _, rs := range cs.AllocatedResourcesStatus {
if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) {
for _, rh := range rs.Resources {
return rh.Health
}
}
}
}
return ""
}).WithTimeout(30 * time.Second).Should(gomega.Equal(v1.ResourceHealthStatusHealthy))
// Second Eventually: Check message
gomega.Eventually(ctx, func(ctx context.Context) string {
updatedPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return ""
}
for _, cs := range updatedPod.Status.ContainerStatuses {
for _, rs := range cs.AllocatedResourcesStatus {
if rs.Name == v1.ResourceName(fmt.Sprintf("claim:%s", pod.Spec.ResourceClaims[0].Name)) {
for _, rh := range rs.Resources {
return ptr.Deref(rh.Message, "")
}
}
}
}
return ""
}).WithTimeout(30 * time.Second).Should(gomega.Equal("Recovered after reset, all diagnostics passed"))
})
})
})

View file

@ -54,6 +54,12 @@ type DeviceHealthUpdate struct {
PoolName string
DeviceName string
Health string
Message string
}
type deviceHealthInfo struct {
status string
message string
}
type ExamplePlugin struct {
@ -76,7 +82,7 @@ type ExamplePlugin struct {
gRPCCalls []GRPCCall
healthMutex sync.Mutex
deviceHealth map[string]string
deviceHealth map[string]deviceHealthInfo
HealthControlChan chan DeviceHealthUpdate
blockPrepareResourcesMutex sync.Mutex
@ -205,7 +211,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
nodeName: nodeName,
prepared: make(map[ClaimID][]kubeletplugin.Device),
cancelMainContext: testOpts.cancelMainContext,
deviceHealth: make(map[string]string),
deviceHealth: make(map[string]deviceHealthInfo),
HealthControlChan: make(chan DeviceHealthUpdate, 10),
}
@ -633,7 +639,10 @@ func (ex *ExamplePlugin) NodeWatchResources(req *drahealthv1alpha1.NodeWatchReso
logger.V(3).Info("Received health update from control channel", "update", update)
ex.healthMutex.Lock()
key := update.PoolName + "/" + update.DeviceName
ex.deviceHealth[key] = update.Health
ex.deviceHealth[key] = deviceHealthInfo{
status: update.Health,
message: update.Message,
}
ex.healthMutex.Unlock()
if err := ex.sendHealthUpdate(srv); err != nil {
@ -657,7 +666,7 @@ func (ex *ExamplePlugin) sendHealthUpdate(srv drahealthv1alpha1.DRAResourceHealt
healthUpdates := []*drahealthv1alpha1.DeviceHealth{}
ex.healthMutex.Lock()
for key, health := range ex.deviceHealth {
for key, healthInfo := range ex.deviceHealth {
parts := strings.SplitN(key, "/", 2)
if len(parts) != 2 {
continue
@ -666,7 +675,7 @@ func (ex *ExamplePlugin) sendHealthUpdate(srv drahealthv1alpha1.DRAResourceHealt
deviceName := parts[1]
var healthEnum drahealthv1alpha1.HealthStatus
switch health {
switch healthInfo.status {
case "Healthy":
healthEnum = drahealthv1alpha1.HealthStatus_HEALTHY
case "Unhealthy":
@ -682,6 +691,7 @@ func (ex *ExamplePlugin) sendHealthUpdate(srv drahealthv1alpha1.DRAResourceHealt
},
Health: healthEnum,
LastUpdatedTime: time.Now().Unix(),
Message: healthInfo.message,
})
}
ex.healthMutex.Unlock()

View file

@ -662,6 +662,7 @@ func (d *Driver) SetUp(tCtx ktesting.TContext, kubeletRootDir string, nodes *Nod
}
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
app.Options{EnableHealthService: true},
kubeletplugin.GRPCVerbosity(0),
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return d.interceptor(nodename, ctx, req, info, handler)