Merge pull request #137790 from philhuan/kubelet-dra-health-refactor

kubelet/dra: refactor health helpers and add tests
This commit is contained in:
Kubernetes Prow Robot 2026-03-18 14:26:39 +05:30 committed by GitHub
commit 4e859569e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 308 additions and 58 deletions

View file

@ -806,6 +806,7 @@ func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) (
func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
logger := klog.FromContext(context.Background()).WithName("dra-manager")
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
enableHealthMessage := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ResourceHealthStatusMessage)
for i := range status.ContainerStatuses {
containerStatus := &status.ContainerStatuses[i]
@ -913,31 +914,12 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat
}
healthInfo := m.healthInfoCache.getHealthInfo(driverName, device.PoolName, device.DeviceName)
var health v1.ResourceHealthStatus
switch healthInfo.Health {
case state.DeviceHealthStatusHealthy:
health = v1.ResourceHealthStatusHealthy
case state.DeviceHealthStatusUnhealthy:
health = v1.ResourceHealthStatusUnhealthy
default:
health = v1.ResourceHealthStatusUnknown
}
// Create the ResourceHealth entry
resourceHealth := v1.ResourceHealth{
Health: health,
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ResourceHealthStatusMessage) && len(healthInfo.Message) > 0 {
resourceHealth.Message = &healthInfo.Message
}
// Use first CDI device ID as ResourceID, with fallback
if len(device.CDIDeviceIDs) > 0 {
resourceHealth.ResourceID = v1.ResourceID(device.CDIDeviceIDs[0])
} else {
resourceHealth.ResourceID = v1.ResourceID(fmt.Sprintf("%s/%s/%s", driverName, device.PoolName, device.DeviceName))
}
resourceHealth := buildResourceHealth(
driverName,
device,
healthInfo,
enableHealthMessage,
)
// Skip if we've already added this resourceID
if seenResourceIDs[resourceHealth.ResourceID] {
@ -962,6 +944,70 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat
}
}
func toResourceHealthStatus(health state.DeviceHealthStatus) v1.ResourceHealthStatus {
switch health {
case state.DeviceHealthStatusHealthy:
return v1.ResourceHealthStatusHealthy
case state.DeviceHealthStatusUnhealthy:
return v1.ResourceHealthStatusUnhealthy
default:
return v1.ResourceHealthStatusUnknown
}
}
func buildResourceHealth(driverName string, device state.Device, healthInfo state.DeviceHealth, enableMessage bool) v1.ResourceHealth {
// Create the ResourceHealth entry
resourceHealth := v1.ResourceHealth{
Health: toResourceHealthStatus(healthInfo.Health),
}
if enableMessage && len(healthInfo.Message) > 0 {
resourceHealth.Message = &healthInfo.Message
}
// Use first CDI device ID as ResourceID, with fallback
if len(device.CDIDeviceIDs) > 0 {
resourceHealth.ResourceID = v1.ResourceID(device.CDIDeviceIDs[0])
} else {
resourceHealth.ResourceID = v1.ResourceID(fmt.Sprintf("%s/%s/%s", driverName, device.PoolName, device.DeviceName))
}
return resourceHealth
}
func toDeviceHealthStatus(health drahealthv1alpha1.HealthStatus) state.DeviceHealthStatus {
switch health {
case drahealthv1alpha1.HealthStatus_HEALTHY:
return state.DeviceHealthStatusHealthy
case drahealthv1alpha1.HealthStatus_UNHEALTHY:
return state.DeviceHealthStatusUnhealthy
default:
return state.DeviceHealthStatusUnknown
}
}
func buildDeviceHealth(logger klog.Logger, device *drahealthv1alpha1.DeviceHealth) state.DeviceHealth {
// Extract the health check timeout from the gRPC response
// If not specified, zero, or negative, use the default timeout
timeout := DefaultHealthTimeout
timeoutSeconds := device.GetHealthCheckTimeoutSeconds()
if timeoutSeconds > 0 {
timeout = time.Duration(timeoutSeconds) * time.Second
} else if timeoutSeconds < 0 {
// Log warning for negative timeout values and use default
logger.V(4).Info("Ignoring negative health check timeout, using default",
"poolName", device.GetDevice().GetPoolName(),
"deviceName", device.GetDevice().GetDeviceName(),
"providedTimeout", timeoutSeconds,
"defaultTimeout", DefaultHealthTimeout)
}
return state.DeviceHealth{
PoolName: device.GetDevice().GetPoolName(),
DeviceName: device.GetDevice().GetDeviceName(),
Health: toDeviceHealthStatus(device.GetHealth()),
LastUpdated: time.Unix(device.GetLastUpdatedTime(), 0),
HealthCheckTimeout: timeout,
Message: truncateHealthMessage(device.GetMessage()),
}
}
// HandleWatchResourcesStream processes health updates from the DRA plugin.
func (m *Manager) HandleWatchResourcesStream(ctx context.Context, stream drahealthv1alpha1.DRAResourceHealth_NodeWatchResourcesClient, pluginName string) error {
logger := klog.FromContext(ctx).WithName("dra-manager")
@ -996,39 +1042,7 @@ func (m *Manager) HandleWatchResourcesStream(ctx context.Context, stream draheal
// Convert drahealthv1alpha1.DeviceHealth to state.DeviceHealth
devices := make([]state.DeviceHealth, len(resp.GetDevices()))
for i, d := range resp.GetDevices() {
var health state.DeviceHealthStatus
switch d.GetHealth() {
case drahealthv1alpha1.HealthStatus_HEALTHY:
health = state.DeviceHealthStatusHealthy
case drahealthv1alpha1.HealthStatus_UNHEALTHY:
health = state.DeviceHealthStatusUnhealthy
default:
health = state.DeviceHealthStatusUnknown
}
// Extract the health check timeout from the gRPC response
// If not specified, zero, or negative, use the default timeout
timeout := DefaultHealthTimeout
timeoutSeconds := d.GetHealthCheckTimeoutSeconds()
if timeoutSeconds > 0 {
timeout = time.Duration(timeoutSeconds) * time.Second
} else if timeoutSeconds < 0 {
// Log warning for negative timeout values and use default
logger.V(4).Info("Ignoring negative health check timeout, using default",
"poolName", d.GetDevice().GetPoolName(),
"deviceName", d.GetDevice().GetDeviceName(),
"providedTimeout", timeoutSeconds,
"defaultTimeout", DefaultHealthTimeout)
}
devices[i] = state.DeviceHealth{
PoolName: d.GetDevice().GetPoolName(),
DeviceName: d.GetDevice().GetDeviceName(),
Health: health,
LastUpdated: time.Unix(d.GetLastUpdatedTime(), 0),
HealthCheckTimeout: timeout,
Message: truncateHealthMessage(d.GetMessage()),
}
devices[i] = buildDeviceHealth(logger, d)
}
changedDevices, updateErr := m.healthInfoCache.updateHealthInfo(logger, pluginName, devices)

View file

@ -2534,3 +2534,239 @@ func TestTruncateHealthMessage(t *testing.T) {
})
}
}
func TestBuildResourceHealth(t *testing.T) {
testCases := map[string]struct {
device state.Device
healthInfo state.DeviceHealth
enableMessage bool
wantHealth v1.ResourceHealthStatus
wantResource v1.ResourceID
wantMessage *string
}{
"message enabled and CDI id": {
device: state.Device{
PoolName: "pool",
DeviceName: "device",
CDIDeviceIDs: []string{"driver/pool=device"},
},
healthInfo: state.DeviceHealth{
Health: state.DeviceHealthStatusHealthy,
Message: "device ok",
},
enableMessage: true,
wantHealth: v1.ResourceHealthStatusHealthy,
wantResource: v1.ResourceID("driver/pool=device"),
wantMessage: ptr.To("device ok"),
},
"message disabled": {
device: state.Device{
PoolName: "pool",
DeviceName: "device",
CDIDeviceIDs: []string{"driver/pool=device"},
},
healthInfo: state.DeviceHealth{
Health: state.DeviceHealthStatusHealthy,
Message: "device ok",
},
enableMessage: false,
wantHealth: v1.ResourceHealthStatusHealthy,
wantResource: v1.ResourceID("driver/pool=device"),
wantMessage: nil,
},
"empty message": {
device: state.Device{
PoolName: "pool",
DeviceName: "device",
CDIDeviceIDs: []string{"driver/pool=device"},
},
healthInfo: state.DeviceHealth{
Health: state.DeviceHealthStatusHealthy,
Message: "",
},
enableMessage: true,
wantHealth: v1.ResourceHealthStatusHealthy,
wantResource: v1.ResourceID("driver/pool=device"),
wantMessage: nil,
},
"unhealthy status": {
device: state.Device{
PoolName: "pool",
DeviceName: "device",
CDIDeviceIDs: []string{"driver/pool=device"},
},
healthInfo: state.DeviceHealth{
Health: state.DeviceHealthStatusUnhealthy,
Message: "device ok",
},
enableMessage: true,
wantHealth: v1.ResourceHealthStatusUnhealthy,
wantResource: v1.ResourceID("driver/pool=device"),
wantMessage: ptr.To("device ok"),
},
"unknown status": {
device: state.Device{
PoolName: "pool",
DeviceName: "device",
CDIDeviceIDs: []string{"driver/pool=device"},
},
healthInfo: state.DeviceHealth{
Health: state.DeviceHealthStatusUnknown,
Message: "device ok",
},
enableMessage: true,
wantHealth: v1.ResourceHealthStatusUnknown,
wantResource: v1.ResourceID("driver/pool=device"),
wantMessage: ptr.To("device ok"),
},
"fallback resource id": {
device: state.Device{
PoolName: "pool",
DeviceName: "device",
},
healthInfo: state.DeviceHealth{
Health: state.DeviceHealthStatusUnhealthy,
Message: "device ok",
},
enableMessage: true,
wantHealth: v1.ResourceHealthStatusUnhealthy,
wantResource: v1.ResourceID("driver/pool/device"),
wantMessage: ptr.To("device ok"),
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
got := buildResourceHealth("driver", tc.device, tc.healthInfo, tc.enableMessage)
assert.Equal(t, tc.wantHealth, got.Health)
assert.Equal(t, tc.wantResource, got.ResourceID)
assert.Equal(t, tc.wantMessage, got.Message)
})
}
}
func TestToResourceHealthStatus(t *testing.T) {
testCases := map[string]struct {
input state.DeviceHealthStatus
expected v1.ResourceHealthStatus
}{
"healthy": {
input: state.DeviceHealthStatusHealthy,
expected: v1.ResourceHealthStatusHealthy,
},
"unhealthy": {
input: state.DeviceHealthStatusUnhealthy,
expected: v1.ResourceHealthStatusUnhealthy,
},
"unknown": {
input: state.DeviceHealthStatusUnknown,
expected: v1.ResourceHealthStatusUnknown,
},
"empty": {
input: state.DeviceHealthStatus(""),
expected: v1.ResourceHealthStatusUnknown,
},
"unexpected": {
input: state.DeviceHealthStatus("Other"),
expected: v1.ResourceHealthStatusUnknown,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, toResourceHealthStatus(tc.input))
})
}
}
func TestToDeviceHealthStatus(t *testing.T) {
testCases := map[string]struct {
input drahealthv1alpha1.HealthStatus
expected state.DeviceHealthStatus
}{
"healthy": {
input: drahealthv1alpha1.HealthStatus_HEALTHY,
expected: state.DeviceHealthStatusHealthy,
},
"unhealthy": {
input: drahealthv1alpha1.HealthStatus_UNHEALTHY,
expected: state.DeviceHealthStatusUnhealthy,
},
"unknown": {
input: drahealthv1alpha1.HealthStatus_UNKNOWN,
expected: state.DeviceHealthStatusUnknown,
},
"unexpected": {
input: drahealthv1alpha1.HealthStatus(99),
expected: state.DeviceHealthStatusUnknown,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, toDeviceHealthStatus(tc.input))
})
}
}
func TestBuildDeviceHealth(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
longMessage := strings.Repeat("a", v1.ResourceHealthMessageMaxLength+5)
testCases := map[string]struct {
health drahealthv1alpha1.HealthStatus
timeoutSeconds int64
message string
wantHealth state.DeviceHealthStatus
wantTimeout time.Duration
wantMessage string
}{
"healthy with positive timeout": {
health: drahealthv1alpha1.HealthStatus_HEALTHY,
timeoutSeconds: 12,
message: "ok",
wantHealth: state.DeviceHealthStatusHealthy,
wantTimeout: 12 * time.Second,
wantMessage: "ok",
},
"unhealthy with zero timeout": {
health: drahealthv1alpha1.HealthStatus_UNHEALTHY,
timeoutSeconds: 0,
message: "fail",
wantHealth: state.DeviceHealthStatusUnhealthy,
wantTimeout: DefaultHealthTimeout,
wantMessage: "fail",
},
"unknown with negative timeout": {
health: drahealthv1alpha1.HealthStatus_UNKNOWN,
timeoutSeconds: -1,
message: longMessage,
wantHealth: state.DeviceHealthStatusUnknown,
wantTimeout: DefaultHealthTimeout,
wantMessage: truncateHealthMessage(longMessage),
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
grpcDevice := &drahealthv1alpha1.DeviceHealth{
Device: &drahealthv1alpha1.DeviceIdentifier{
PoolName: "pool",
DeviceName: "device",
},
Health: tc.health,
LastUpdatedTime: 123,
HealthCheckTimeoutSeconds: tc.timeoutSeconds,
Message: tc.message,
}
got := buildDeviceHealth(logger, grpcDevice)
assert.Equal(t, "pool", got.PoolName)
assert.Equal(t, "device", got.DeviceName)
assert.Equal(t, tc.wantHealth, got.Health)
assert.Equal(t, time.Unix(123, 0), got.LastUpdated)
assert.Equal(t, tc.wantTimeout, got.HealthCheckTimeout)
assert.Equal(t, tc.wantMessage, got.Message)
})
}
}