kubelet: DRA: claiminfo: improve logging

- got rid of embedding logger into a struct
- added logging prefix
This commit is contained in:
Ed Bartosh 2026-01-26 17:24:24 +02:00
parent acff01fe8b
commit 2f82dc6dce
3 changed files with 22 additions and 28 deletions

View file

@ -45,8 +45,6 @@ type ClaimInfo struct {
// claimInfoCache is a cache of processed resource claims keyed by namespace/claimname.
type claimInfoCache struct {
logger klog.Logger
sync.RWMutex
checkpointer state.Checkpointer
claimInfo map[string]*ClaimInfo
@ -137,7 +135,7 @@ func (info *ClaimInfo) cdiDevicesAsList(requestName string) []kubecontainer.CDID
}
// newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present).
func newClaimInfoCache(logger klog.Logger, stateDir, checkpointName string) (*claimInfoCache, error) {
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
checkpointer, err := state.NewCheckpointer(stateDir, checkpointName)
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove DRA state file, err: %w", err)
@ -149,7 +147,6 @@ func newClaimInfoCache(logger klog.Logger, stateDir, checkpointName string) (*cl
}
cache := &claimInfoCache{
logger: logger,
checkpointer: checkpointer,
claimInfo: make(map[string]*ClaimInfo),
}
@ -169,11 +166,12 @@ func newClaimInfoCache(logger klog.Logger, stateDir, checkpointName string) (*cl
// withLock runs a function while holding the claimInfoCache lock.
// It logs changes.
func (cache *claimInfoCache) withLock(f func() error) error {
func (cache *claimInfoCache) withLock(logger klog.Logger, f func() error) error {
logger = logger.WithName("dra-claiminfo")
cache.Lock()
defer cache.Unlock()
if loggerV := cache.logger.V(5); loggerV.Enabled() {
if loggerV := logger.V(5); loggerV.Enabled() {
claimsInUseBefore := cache.claimsInUse()
defer func() {
claimsInUseAfter := cache.claimsInUse()
@ -188,7 +186,7 @@ func (cache *claimInfoCache) withLock(f func() error) error {
}
if changed {
cache.logger.V(5).Info("ResourceClaim usage changed", "claimsInUse", delta)
logger.V(5).Info("ResourceClaim usage changed", "claimsInUse", delta)
}
}()
}

View file

@ -445,8 +445,7 @@ func TestNewClaimInfoCache(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
tCtx := ktesting.Init(t)
result, err := newClaimInfoCache(tCtx.Logger(), test.stateDir, test.checkpointName)
result, err := newClaimInfoCache(test.stateDir, test.checkpointName)
if test.wantErr {
assert.Error(t, err)
return
@ -505,10 +504,10 @@ func TestClaimInfoCacheWithLock(t *testing.T) {
} {
t.Run(test.description, func(t *testing.T) {
tCtx := ktesting.Init(t)
cache, err := newClaimInfoCache(tCtx.Logger(), t.TempDir(), "test-checkpoint")
cache, err := newClaimInfoCache(t.TempDir(), "test-checkpoint")
require.NoError(t, err)
assert.NotNil(t, cache)
err = cache.withLock(test.funcGen(cache))
err = cache.withLock(tCtx.Logger(), test.funcGen(cache))
if test.wantErr {
assert.Error(t, err)
return
@ -565,8 +564,7 @@ func TestClaimInfoCacheWithRLock(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
tCtx := ktesting.Init(t)
cache, err := newClaimInfoCache(tCtx.Logger(), t.TempDir(), "test-checkpoint")
cache, err := newClaimInfoCache(t.TempDir(), "test-checkpoint")
require.NoError(t, err)
assert.NotNil(t, cache)
err = cache.withRLock(test.funcGen(cache))
@ -619,7 +617,7 @@ dra_resource_claims_in_use{driver_name="<any>"} 1
dra_resource_claims_in_use{driver_name="other-test-driver"} 1
dra_resource_claims_in_use{driver_name="test-driver"} 1
`,
expectLog: `INFO ResourceClaim usage changed claimsInUse=<
expectLog: `INFO dra-claiminfo: ResourceClaim usage changed claimsInUse=<
<any>: 1 (+1)
other-test-driver: 1 (+1)
test-driver: 1 (+1)
@ -655,7 +653,7 @@ dra_resource_claims_in_use{driver_name="<any>"} 2
dra_resource_claims_in_use{driver_name="other-test-driver"} 1
dra_resource_claims_in_use{driver_name="test-driver"} 2
`,
expectLog: `INFO ResourceClaim usage changed claimsInUse=<
expectLog: `INFO dra-claiminfo: ResourceClaim usage changed claimsInUse=<
<any>: 2 (+1)
other-test-driver: 1 (+1)
test-driver: 2 (+1)
@ -665,13 +663,13 @@ dra_resource_claims_in_use{driver_name="test-driver"} 2
} {
t.Run(test.description, func(t *testing.T) {
tCtx := ktesting.Init(t, initoption.BufferLogs(true))
cache, err := newClaimInfoCache(tCtx.Logger(), t.TempDir(), "test-checkpoint")
cache, err := newClaimInfoCache(t.TempDir(), "test-checkpoint")
for _, claimInfo := range test.initialClaimInfo {
cache.add(claimInfo)
}
require.NoError(t, err)
assert.NotNil(t, cache)
_ = cache.withLock(func() error {
_ = cache.withLock(tCtx.Logger(), func() error {
cache.add(test.claimInfo)
return nil
})
@ -809,7 +807,7 @@ dra_resource_claims_in_use{driver_name="<any>"} 1
dra_resource_claims_in_use{driver_name="test-driver"} 1
dra_resource_claims_in_use{driver_name="other-test-driver"} 1
`,
expectLog: `INFO ResourceClaim usage changed claimsInUse=<
expectLog: `INFO dra-claiminfo: ResourceClaim usage changed claimsInUse=<
<any>: 1 (-1)
other-test-driver: 1 (+0)
test-driver: 1 (-1)
@ -827,8 +825,7 @@ dra_resource_claims_in_use{driver_name="<any>"} 0
} {
t.Run(test.description, func(t *testing.T) {
tCtx := ktesting.Init(t, initoption.BufferLogs(true))
test.claimInfoCache.logger = tCtx.Logger()
_ = test.claimInfoCache.withLock(func() error {
_ = test.claimInfoCache.withLock(tCtx.Logger(), func() error {
test.claimInfoCache.delete(claimName, namespace)
return nil
})
@ -886,8 +883,7 @@ func TestSyncToCheckpoint(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
tCtx := ktesting.Init(t)
cache, err := newClaimInfoCache(tCtx.Logger(), test.stateDir, test.checkpointName)
cache, err := newClaimInfoCache(test.stateDir, test.checkpointName)
require.NoError(t, err)
err = cache.syncToCheckpoint()
if test.wantErr {

View file

@ -118,7 +118,7 @@ type Manager struct {
// - Avoid repeated "failed to ...: failed to ..." when wrapping errors.
// - Avoid wrapping when it does not provide relevant additional information to keep the user-visible error short.
func NewManager(logger klog.Logger, kubeClient clientset.Interface, stateFileDirectory string) (*Manager, error) {
claimInfoCache, err := newClaimInfoCache(logger, stateFileDirectory, draManagerStateFileName)
claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
if err != nil {
return nil, fmt.Errorf("create ResourceClaim cache: %w", err)
}
@ -327,7 +327,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error {
// Now that we have everything that we need, we can update the claim info cache.
// Almost nothing can go wrong anymore at this point.
err = m.cache.withLock(func() error {
err = m.cache.withLock(logger, func() error {
for i := range podResourceClaims {
resourceClaim := infos[i].resourceClaim
podClaim := infos[i].podClaim
@ -412,7 +412,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error {
claim := resourceClaims[types.UID(claimUID)]
// Add the prepared CDI devices to the claim info
err := m.cache.withLock(func() error {
err := m.cache.withLock(logger, func() error {
info, exists := m.cache.get(claim.Name, claim.Namespace)
if !exists {
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name)
@ -437,7 +437,7 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error {
}
// Atomically perform some operations on the claimInfo cache.
err = m.cache.withLock(func() error {
err = m.cache.withLock(logger, func() error {
// Mark all pod claims as prepared.
for _, claim := range resourceClaims {
info, exists := m.cache.get(claim.Name, claim.Namespace)
@ -594,7 +594,7 @@ func (m *Manager) unprepareResources(ctx context.Context, podUID types.UID, name
claimNamesMap := make(map[types.UID]string)
for _, claimName := range claimNames {
// Atomically perform some operations on the claimInfo cache.
err := m.cache.withLock(func() error {
err := m.cache.withLock(logger, func() error {
// Get the claim info from the cache
claimInfo, exists := m.cache.get(claimName, namespace)
@ -670,7 +670,7 @@ func (m *Manager) unprepareResources(ctx context.Context, podUID types.UID, name
}
// Atomically perform some operations on the claimInfo cache.
err := m.cache.withLock(func() error {
err := m.cache.withLock(logger, func() error {
// TODO(#132978): Re-evaluate this logic to support post-mortem health updates.
// As of the initial implementation, we immediately delete the claim info upon
// unprepare. This means a late-arriving health update for a terminated pod