From ce9df0b9e994a066a8a4c8cf35018f1825a5fd52 Mon Sep 17 00:00:00 2001 From: philhuan Date: Tue, 17 Mar 2026 09:15:14 +0800 Subject: [PATCH] kubelet/dra: refactor health helpers and add tests kubelet/dra: extend helper test coverage --- pkg/kubelet/cm/dra/manager.go | 130 +++++++++------- pkg/kubelet/cm/dra/manager_test.go | 236 +++++++++++++++++++++++++++++ 2 files changed, 308 insertions(+), 58 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 15650df6ba0..887a75f7fe6 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -806,6 +806,7 @@ func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ( func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { logger := klog.FromContext(context.Background()).WithName("dra-manager") logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) + enableHealthMessage := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ResourceHealthStatusMessage) for i := range status.ContainerStatuses { containerStatus := &status.ContainerStatuses[i] @@ -913,31 +914,12 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat } healthInfo := m.healthInfoCache.getHealthInfo(driverName, device.PoolName, device.DeviceName) - - var health v1.ResourceHealthStatus - switch healthInfo.Health { - case state.DeviceHealthStatusHealthy: - health = v1.ResourceHealthStatusHealthy - case state.DeviceHealthStatusUnhealthy: - health = v1.ResourceHealthStatusUnhealthy - default: - health = v1.ResourceHealthStatusUnknown - } - - // Create the ResourceHealth entry - resourceHealth := v1.ResourceHealth{ - Health: health, - } - if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ResourceHealthStatusMessage) && len(healthInfo.Message) > 0 { - resourceHealth.Message = &healthInfo.Message - } - - // Use first CDI device ID as ResourceID, with fallback - if len(device.CDIDeviceIDs) > 0 { - resourceHealth.ResourceID = v1.ResourceID(device.CDIDeviceIDs[0]) - } else { - resourceHealth.ResourceID = v1.ResourceID(fmt.Sprintf("%s/%s/%s", driverName, device.PoolName, device.DeviceName)) - } + resourceHealth := buildResourceHealth( + driverName, + device, + healthInfo, + enableHealthMessage, + ) // Skip if we've already added this resourceID if seenResourceIDs[resourceHealth.ResourceID] { @@ -962,6 +944,70 @@ func (m *Manager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStat } } +func toResourceHealthStatus(health state.DeviceHealthStatus) v1.ResourceHealthStatus { + switch health { + case state.DeviceHealthStatusHealthy: + return v1.ResourceHealthStatusHealthy + case state.DeviceHealthStatusUnhealthy: + return v1.ResourceHealthStatusUnhealthy + default: + return v1.ResourceHealthStatusUnknown + } +} + +func buildResourceHealth(driverName string, device state.Device, healthInfo state.DeviceHealth, enableMessage bool) v1.ResourceHealth { + // Create the ResourceHealth entry + resourceHealth := v1.ResourceHealth{ + Health: toResourceHealthStatus(healthInfo.Health), + } + if enableMessage && len(healthInfo.Message) > 0 { + resourceHealth.Message = &healthInfo.Message + } + // Use first CDI device ID as ResourceID, with fallback + if len(device.CDIDeviceIDs) > 0 { + resourceHealth.ResourceID = v1.ResourceID(device.CDIDeviceIDs[0]) + } else { + resourceHealth.ResourceID = v1.ResourceID(fmt.Sprintf("%s/%s/%s", driverName, device.PoolName, device.DeviceName)) + } + return resourceHealth +} + +func toDeviceHealthStatus(health drahealthv1alpha1.HealthStatus) state.DeviceHealthStatus { + switch health { + case drahealthv1alpha1.HealthStatus_HEALTHY: + return state.DeviceHealthStatusHealthy + case drahealthv1alpha1.HealthStatus_UNHEALTHY: + return state.DeviceHealthStatusUnhealthy + default: + return state.DeviceHealthStatusUnknown + } +} + +func buildDeviceHealth(logger klog.Logger, device *drahealthv1alpha1.DeviceHealth) state.DeviceHealth { + // Extract the health check timeout from the gRPC response + // If not specified, zero, or negative, use the default timeout + timeout := DefaultHealthTimeout + timeoutSeconds := device.GetHealthCheckTimeoutSeconds() + if timeoutSeconds > 0 { + timeout = time.Duration(timeoutSeconds) * time.Second + } else if timeoutSeconds < 0 { + // Log warning for negative timeout values and use default + logger.V(4).Info("Ignoring negative health check timeout, using default", + "poolName", device.GetDevice().GetPoolName(), + "deviceName", device.GetDevice().GetDeviceName(), + "providedTimeout", timeoutSeconds, + "defaultTimeout", DefaultHealthTimeout) + } + return state.DeviceHealth{ + PoolName: device.GetDevice().GetPoolName(), + DeviceName: device.GetDevice().GetDeviceName(), + Health: toDeviceHealthStatus(device.GetHealth()), + LastUpdated: time.Unix(device.GetLastUpdatedTime(), 0), + HealthCheckTimeout: timeout, + Message: truncateHealthMessage(device.GetMessage()), + } +} + // HandleWatchResourcesStream processes health updates from the DRA plugin. func (m *Manager) HandleWatchResourcesStream(ctx context.Context, stream drahealthv1alpha1.DRAResourceHealth_NodeWatchResourcesClient, pluginName string) error { logger := klog.FromContext(ctx).WithName("dra-manager") @@ -996,39 +1042,7 @@ func (m *Manager) HandleWatchResourcesStream(ctx context.Context, stream draheal // Convert drahealthv1alpha1.DeviceHealth to state.DeviceHealth devices := make([]state.DeviceHealth, len(resp.GetDevices())) for i, d := range resp.GetDevices() { - var health state.DeviceHealthStatus - switch d.GetHealth() { - case drahealthv1alpha1.HealthStatus_HEALTHY: - health = state.DeviceHealthStatusHealthy - case drahealthv1alpha1.HealthStatus_UNHEALTHY: - health = state.DeviceHealthStatusUnhealthy - default: - health = state.DeviceHealthStatusUnknown - } - - // Extract the health check timeout from the gRPC response - // If not specified, zero, or negative, use the default timeout - timeout := DefaultHealthTimeout - timeoutSeconds := d.GetHealthCheckTimeoutSeconds() - if timeoutSeconds > 0 { - timeout = time.Duration(timeoutSeconds) * time.Second - } else if timeoutSeconds < 0 { - // Log warning for negative timeout values and use default - logger.V(4).Info("Ignoring negative health check timeout, using default", - "poolName", d.GetDevice().GetPoolName(), - "deviceName", d.GetDevice().GetDeviceName(), - "providedTimeout", timeoutSeconds, - "defaultTimeout", DefaultHealthTimeout) - } - - devices[i] = state.DeviceHealth{ - PoolName: d.GetDevice().GetPoolName(), - DeviceName: d.GetDevice().GetDeviceName(), - Health: health, - LastUpdated: time.Unix(d.GetLastUpdatedTime(), 0), - HealthCheckTimeout: timeout, - Message: truncateHealthMessage(d.GetMessage()), - } + devices[i] = buildDeviceHealth(logger, d) } changedDevices, updateErr := m.healthInfoCache.updateHealthInfo(logger, pluginName, devices) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index c8f291ba377..61c5f9eef81 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -2534,3 +2534,239 @@ func TestTruncateHealthMessage(t *testing.T) { }) } } + +func TestBuildResourceHealth(t *testing.T) { + testCases := map[string]struct { + device state.Device + healthInfo state.DeviceHealth + enableMessage bool + wantHealth v1.ResourceHealthStatus + wantResource v1.ResourceID + wantMessage *string + }{ + "message enabled and CDI id": { + device: state.Device{ + PoolName: "pool", + DeviceName: "device", + CDIDeviceIDs: []string{"driver/pool=device"}, + }, + healthInfo: state.DeviceHealth{ + Health: state.DeviceHealthStatusHealthy, + Message: "device ok", + }, + enableMessage: true, + wantHealth: v1.ResourceHealthStatusHealthy, + wantResource: v1.ResourceID("driver/pool=device"), + wantMessage: ptr.To("device ok"), + }, + "message disabled": { + device: state.Device{ + PoolName: "pool", + DeviceName: "device", + CDIDeviceIDs: []string{"driver/pool=device"}, + }, + healthInfo: state.DeviceHealth{ + Health: state.DeviceHealthStatusHealthy, + Message: "device ok", + }, + enableMessage: false, + wantHealth: v1.ResourceHealthStatusHealthy, + wantResource: v1.ResourceID("driver/pool=device"), + wantMessage: nil, + }, + "empty message": { + device: state.Device{ + PoolName: "pool", + DeviceName: "device", + CDIDeviceIDs: []string{"driver/pool=device"}, + }, + healthInfo: state.DeviceHealth{ + Health: state.DeviceHealthStatusHealthy, + Message: "", + }, + enableMessage: true, + wantHealth: v1.ResourceHealthStatusHealthy, + wantResource: v1.ResourceID("driver/pool=device"), + wantMessage: nil, + }, + "unhealthy status": { + device: state.Device{ + PoolName: "pool", + DeviceName: "device", + CDIDeviceIDs: []string{"driver/pool=device"}, + }, + healthInfo: state.DeviceHealth{ + Health: state.DeviceHealthStatusUnhealthy, + Message: "device ok", + }, + enableMessage: true, + wantHealth: v1.ResourceHealthStatusUnhealthy, + wantResource: v1.ResourceID("driver/pool=device"), + wantMessage: ptr.To("device ok"), + }, + "unknown status": { + device: state.Device{ + PoolName: "pool", + DeviceName: "device", + CDIDeviceIDs: []string{"driver/pool=device"}, + }, + healthInfo: state.DeviceHealth{ + Health: state.DeviceHealthStatusUnknown, + Message: "device ok", + }, + enableMessage: true, + wantHealth: v1.ResourceHealthStatusUnknown, + wantResource: v1.ResourceID("driver/pool=device"), + wantMessage: ptr.To("device ok"), + }, + "fallback resource id": { + device: state.Device{ + PoolName: "pool", + DeviceName: "device", + }, + healthInfo: state.DeviceHealth{ + Health: state.DeviceHealthStatusUnhealthy, + Message: "device ok", + }, + enableMessage: true, + wantHealth: v1.ResourceHealthStatusUnhealthy, + wantResource: v1.ResourceID("driver/pool/device"), + wantMessage: ptr.To("device ok"), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + got := buildResourceHealth("driver", tc.device, tc.healthInfo, tc.enableMessage) + assert.Equal(t, tc.wantHealth, got.Health) + assert.Equal(t, tc.wantResource, got.ResourceID) + assert.Equal(t, tc.wantMessage, got.Message) + }) + } +} + +func TestToResourceHealthStatus(t *testing.T) { + testCases := map[string]struct { + input state.DeviceHealthStatus + expected v1.ResourceHealthStatus + }{ + "healthy": { + input: state.DeviceHealthStatusHealthy, + expected: v1.ResourceHealthStatusHealthy, + }, + "unhealthy": { + input: state.DeviceHealthStatusUnhealthy, + expected: v1.ResourceHealthStatusUnhealthy, + }, + "unknown": { + input: state.DeviceHealthStatusUnknown, + expected: v1.ResourceHealthStatusUnknown, + }, + "empty": { + input: state.DeviceHealthStatus(""), + expected: v1.ResourceHealthStatusUnknown, + }, + "unexpected": { + input: state.DeviceHealthStatus("Other"), + expected: v1.ResourceHealthStatusUnknown, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.expected, toResourceHealthStatus(tc.input)) + }) + } +} + +func TestToDeviceHealthStatus(t *testing.T) { + testCases := map[string]struct { + input drahealthv1alpha1.HealthStatus + expected state.DeviceHealthStatus + }{ + "healthy": { + input: drahealthv1alpha1.HealthStatus_HEALTHY, + expected: state.DeviceHealthStatusHealthy, + }, + "unhealthy": { + input: drahealthv1alpha1.HealthStatus_UNHEALTHY, + expected: state.DeviceHealthStatusUnhealthy, + }, + "unknown": { + input: drahealthv1alpha1.HealthStatus_UNKNOWN, + expected: state.DeviceHealthStatusUnknown, + }, + "unexpected": { + input: drahealthv1alpha1.HealthStatus(99), + expected: state.DeviceHealthStatusUnknown, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.expected, toDeviceHealthStatus(tc.input)) + }) + } +} + +func TestBuildDeviceHealth(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + longMessage := strings.Repeat("a", v1.ResourceHealthMessageMaxLength+5) + + testCases := map[string]struct { + health drahealthv1alpha1.HealthStatus + timeoutSeconds int64 + message string + wantHealth state.DeviceHealthStatus + wantTimeout time.Duration + wantMessage string + }{ + "healthy with positive timeout": { + health: drahealthv1alpha1.HealthStatus_HEALTHY, + timeoutSeconds: 12, + message: "ok", + wantHealth: state.DeviceHealthStatusHealthy, + wantTimeout: 12 * time.Second, + wantMessage: "ok", + }, + "unhealthy with zero timeout": { + health: drahealthv1alpha1.HealthStatus_UNHEALTHY, + timeoutSeconds: 0, + message: "fail", + wantHealth: state.DeviceHealthStatusUnhealthy, + wantTimeout: DefaultHealthTimeout, + wantMessage: "fail", + }, + "unknown with negative timeout": { + health: drahealthv1alpha1.HealthStatus_UNKNOWN, + timeoutSeconds: -1, + message: longMessage, + wantHealth: state.DeviceHealthStatusUnknown, + wantTimeout: DefaultHealthTimeout, + wantMessage: truncateHealthMessage(longMessage), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + grpcDevice := &drahealthv1alpha1.DeviceHealth{ + Device: &drahealthv1alpha1.DeviceIdentifier{ + PoolName: "pool", + DeviceName: "device", + }, + Health: tc.health, + LastUpdatedTime: 123, + HealthCheckTimeoutSeconds: tc.timeoutSeconds, + Message: tc.message, + } + + got := buildDeviceHealth(logger, grpcDevice) + assert.Equal(t, "pool", got.PoolName) + assert.Equal(t, "device", got.DeviceName) + assert.Equal(t, tc.wantHealth, got.Health) + assert.Equal(t, time.Unix(123, 0), got.LastUpdated) + assert.Equal(t, tc.wantTimeout, got.HealthCheckTimeout) + assert.Equal(t, tc.wantMessage, got.Message) + }) + } +}