diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index f743258a14f..c3dd650500e 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -45,8 +45,6 @@ type ClaimInfo struct { // claimInfoCache is a cache of processed resource claims keyed by namespace/claimname. type claimInfoCache struct { - logger klog.Logger - sync.RWMutex checkpointer state.Checkpointer claimInfo map[string]*ClaimInfo @@ -137,7 +135,7 @@ func (info *ClaimInfo) cdiDevicesAsList(requestName string) []kubecontainer.CDID } // newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present). -func newClaimInfoCache(logger klog.Logger, stateDir, checkpointName string) (*claimInfoCache, error) { +func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { checkpointer, err := state.NewCheckpointer(stateDir, checkpointName) if err != nil { return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove DRA state file, err: %w", err) @@ -149,7 +147,6 @@ func newClaimInfoCache(logger klog.Logger, stateDir, checkpointName string) (*cl } cache := &claimInfoCache{ - logger: logger, checkpointer: checkpointer, claimInfo: make(map[string]*ClaimInfo), } @@ -169,11 +166,12 @@ func newClaimInfoCache(logger klog.Logger, stateDir, checkpointName string) (*cl // withLock runs a function while holding the claimInfoCache lock. // It logs changes. -func (cache *claimInfoCache) withLock(f func() error) error { +func (cache *claimInfoCache) withLock(logger klog.Logger, f func() error) error { + logger = logger.WithName("dra-claiminfo") cache.Lock() defer cache.Unlock() - if loggerV := cache.logger.V(5); loggerV.Enabled() { + if loggerV := logger.V(5); loggerV.Enabled() { claimsInUseBefore := cache.claimsInUse() defer func() { claimsInUseAfter := cache.claimsInUse() @@ -188,7 +186,7 @@ func (cache *claimInfoCache) withLock(f func() error) error { } if changed { - cache.logger.V(5).Info("ResourceClaim usage changed", "claimsInUse", delta) + logger.V(5).Info("ResourceClaim usage changed", "claimsInUse", delta) } }() } diff --git a/pkg/kubelet/cm/dra/claiminfo_test.go b/pkg/kubelet/cm/dra/claiminfo_test.go index 71d77086278..1c9baf42026 100644 --- a/pkg/kubelet/cm/dra/claiminfo_test.go +++ b/pkg/kubelet/cm/dra/claiminfo_test.go @@ -445,8 +445,7 @@ func TestNewClaimInfoCache(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - tCtx := ktesting.Init(t) - result, err := newClaimInfoCache(tCtx.Logger(), test.stateDir, test.checkpointName) + result, err := newClaimInfoCache(test.stateDir, test.checkpointName) if test.wantErr { assert.Error(t, err) return @@ -505,10 +504,10 @@ func TestClaimInfoCacheWithLock(t *testing.T) { } { t.Run(test.description, func(t *testing.T) { tCtx := ktesting.Init(t) - cache, err := newClaimInfoCache(tCtx.Logger(), t.TempDir(), "test-checkpoint") + cache, err := newClaimInfoCache(t.TempDir(), "test-checkpoint") require.NoError(t, err) assert.NotNil(t, cache) - err = cache.withLock(test.funcGen(cache)) + err = cache.withLock(tCtx.Logger(), test.funcGen(cache)) if test.wantErr { assert.Error(t, err) return @@ -565,8 +564,7 @@ func TestClaimInfoCacheWithRLock(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - tCtx := ktesting.Init(t) - cache, err := newClaimInfoCache(tCtx.Logger(), t.TempDir(), "test-checkpoint") + cache, err := newClaimInfoCache(t.TempDir(), "test-checkpoint") require.NoError(t, err) assert.NotNil(t, cache) err = cache.withRLock(test.funcGen(cache)) @@ -619,7 +617,7 @@ dra_resource_claims_in_use{driver_name=""} 1 dra_resource_claims_in_use{driver_name="other-test-driver"} 1 dra_resource_claims_in_use{driver_name="test-driver"} 1 `, - expectLog: `INFO ResourceClaim usage changed claimsInUse=< + expectLog: `INFO dra-claiminfo: ResourceClaim usage changed claimsInUse=< : 1 (+1) other-test-driver: 1 (+1) test-driver: 1 (+1) @@ -655,7 +653,7 @@ dra_resource_claims_in_use{driver_name=""} 2 dra_resource_claims_in_use{driver_name="other-test-driver"} 1 dra_resource_claims_in_use{driver_name="test-driver"} 2 `, - expectLog: `INFO ResourceClaim usage changed claimsInUse=< + expectLog: `INFO dra-claiminfo: ResourceClaim usage changed claimsInUse=< : 2 (+1) other-test-driver: 1 (+1) test-driver: 2 (+1) @@ -665,13 +663,13 @@ dra_resource_claims_in_use{driver_name="test-driver"} 2 } { t.Run(test.description, func(t *testing.T) { tCtx := ktesting.Init(t, initoption.BufferLogs(true)) - cache, err := newClaimInfoCache(tCtx.Logger(), t.TempDir(), "test-checkpoint") + cache, err := newClaimInfoCache(t.TempDir(), "test-checkpoint") for _, claimInfo := range test.initialClaimInfo { cache.add(claimInfo) } require.NoError(t, err) assert.NotNil(t, cache) - _ = cache.withLock(func() error { + _ = cache.withLock(tCtx.Logger(), func() error { cache.add(test.claimInfo) return nil }) @@ -809,7 +807,7 @@ dra_resource_claims_in_use{driver_name=""} 1 dra_resource_claims_in_use{driver_name="test-driver"} 1 dra_resource_claims_in_use{driver_name="other-test-driver"} 1 `, - expectLog: `INFO ResourceClaim usage changed claimsInUse=< + expectLog: `INFO dra-claiminfo: ResourceClaim usage changed claimsInUse=< : 1 (-1) other-test-driver: 1 (+0) test-driver: 1 (-1) @@ -827,8 +825,7 @@ dra_resource_claims_in_use{driver_name=""} 0 } { t.Run(test.description, func(t *testing.T) { tCtx := ktesting.Init(t, initoption.BufferLogs(true)) - test.claimInfoCache.logger = tCtx.Logger() - _ = test.claimInfoCache.withLock(func() error { + _ = test.claimInfoCache.withLock(tCtx.Logger(), func() error { test.claimInfoCache.delete(claimName, namespace) return nil }) @@ -886,8 +883,7 @@ func TestSyncToCheckpoint(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - tCtx := ktesting.Init(t) - cache, err := newClaimInfoCache(tCtx.Logger(), test.stateDir, test.checkpointName) + cache, err := newClaimInfoCache(test.stateDir, test.checkpointName) require.NoError(t, err) err = cache.syncToCheckpoint() if test.wantErr { diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 35a02f1e17c..ac7d1f59061 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -118,7 +118,7 @@ type Manager struct { // - Avoid repeated "failed to ...: failed to ..." when wrapping errors. // - Avoid wrapping when it does not provide relevant additional information to keep the user-visible error short. func NewManager(logger klog.Logger, kubeClient clientset.Interface, stateFileDirectory string) (*Manager, error) { - claimInfoCache, err := newClaimInfoCache(logger, stateFileDirectory, draManagerStateFileName) + claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName) if err != nil { return nil, fmt.Errorf("create ResourceClaim cache: %w", err) } @@ -327,7 +327,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error { // Now that we have everything that we need, we can update the claim info cache. // Almost nothing can go wrong anymore at this point. - err = m.cache.withLock(func() error { + err = m.cache.withLock(logger, func() error { for i := range podResourceClaims { resourceClaim := infos[i].resourceClaim podClaim := infos[i].podClaim @@ -412,7 +412,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error { claim := resourceClaims[types.UID(claimUID)] // Add the prepared CDI devices to the claim info - err := m.cache.withLock(func() error { + err := m.cache.withLock(logger, func() error { info, exists := m.cache.get(claim.Name, claim.Namespace) if !exists { return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name) @@ -437,7 +437,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error { } // Atomically perform some operations on the claimInfo cache. - err = m.cache.withLock(func() error { + err = m.cache.withLock(logger, func() error { // Mark all pod claims as prepared. for _, claim := range resourceClaims { info, exists := m.cache.get(claim.Name, claim.Namespace) @@ -594,7 +594,7 @@ func (m *Manager) unprepareResources(ctx context.Context, podUID types.UID, name claimNamesMap := make(map[types.UID]string) for _, claimName := range claimNames { // Atomically perform some operations on the claimInfo cache. - err := m.cache.withLock(func() error { + err := m.cache.withLock(logger, func() error { // Get the claim info from the cache claimInfo, exists := m.cache.get(claimName, namespace) @@ -670,7 +670,7 @@ func (m *Manager) unprepareResources(ctx context.Context, podUID types.UID, name } // Atomically perform some operations on the claimInfo cache. - err := m.cache.withLock(func() error { + err := m.cache.withLock(logger, func() error { // TODO(#132978): Re-evaluate this logic to support post-mortem health updates. // As of the initial implementation, we immediately delete the claim info upon // unprepare. This means a late-arriving health update for a terminated pod