From aa5e5ea9d6e825bd9e5c39aeb9553b2950bea96a Mon Sep 17 00:00:00 2001 From: Nour Date: Fri, 13 Feb 2026 17:04:33 +0200 Subject: [PATCH] scheduler: use contextual logging for event emission Signed-off-by: Nour --- cmd/kube-scheduler/app/server.go | 2 +- .../plugins/dynamicresources/dynamicresources.go | 2 +- pkg/scheduler/framework/preemption/executor.go | 2 +- pkg/scheduler/framework/runtime/framework.go | 8 ++++---- pkg/scheduler/profile/profile.go | 4 ++-- pkg/scheduler/profile/profile_test.go | 2 +- pkg/scheduler/schedule_one.go | 9 +++++---- pkg/scheduler/schedule_one_test.go | 10 +++++----- pkg/scheduler/scheduler_test.go | 6 +++--- .../src/k8s.io/kube-scheduler/framework/interface.go | 2 +- 10 files changed, 24 insertions(+), 23 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 315a240131e..4d0ebfd19af 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -408,7 +408,7 @@ func newEndpointsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, } func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory { - return func(name string) events.EventRecorder { + return func(name string) events.EventRecorderLogger { return cc.EventBroadcaster.NewRecorder(name) } } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 1a7183a5c63..de0dfbec2a5 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -1183,7 +1183,7 @@ func (pl *DynamicResources) PreBind(ctx context.Context, cs fwk.CycleState, pod } // We need to wait for the device to be attached to the node. - pl.fh.EventRecorder().Eventf(pod, nil, v1.EventTypeNormal, "BindingConditionsPending", "Scheduling", "waiting for binding conditions for device on node %s", nodeName) + pl.fh.EventRecorder().WithLogger(logger).Eventf(pod, nil, v1.EventTypeNormal, "BindingConditionsPending", "Scheduling", "waiting for binding conditions for device on node %s", nodeName) // START: Record start time for metrics duration calculation start := time.Now() err = wait.PollUntilContextTimeout(ctx, 5*time.Second, pl.bindingTimeout, true, diff --git a/pkg/scheduler/framework/preemption/executor.go b/pkg/scheduler/framework/preemption/executor.go index 739e9d7e299..98ccc3d6347 100644 --- a/pkg/scheduler/framework/preemption/executor.go +++ b/pkg/scheduler/framework/preemption/executor.go @@ -127,7 +127,7 @@ func newExecutor(fh fwk.Handle) *Executor { eventMessage += " (in kube-scheduler memory)." } - fh.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", eventMessage) + fh.EventRecorder().WithLogger(logger).Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", eventMessage) return nil } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 2bfc640293a..40f6435e500 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -84,7 +84,7 @@ type frameworkImpl struct { clientSet clientset.Interface kubeConfig *restclient.Config - eventRecorder events.EventRecorder + eventRecorder events.EventRecorderLogger informerFactory informers.SharedInformerFactory sharedDRAManager fwk.SharedDRAManager podGroupManager fwk.PodGroupManager @@ -148,7 +148,7 @@ type frameworkOptions struct { componentConfigVersion string clientSet clientset.Interface kubeConfig *restclient.Config - eventRecorder events.EventRecorder + eventRecorder events.EventRecorderLogger informerFactory informers.SharedInformerFactory sharedDRAManager fwk.SharedDRAManager sharedCSIManager fwk.CSIManager @@ -194,7 +194,7 @@ func WithKubeConfig(kubeConfig *restclient.Config) Option { } // WithEventRecorder sets clientSet for the scheduling frameworkImpl. -func WithEventRecorder(recorder events.EventRecorder) Option { +func WithEventRecorder(recorder events.EventRecorderLogger) Option { return func(o *frameworkOptions) { o.eventRecorder = recorder } @@ -2159,7 +2159,7 @@ func (f *frameworkImpl) KubeConfig() *restclient.Config { } // EventRecorder returns an event recorder. -func (f *frameworkImpl) EventRecorder() events.EventRecorder { +func (f *frameworkImpl) EventRecorder() events.EventRecorderLogger { return f.eventRecorder } diff --git a/pkg/scheduler/profile/profile.go b/pkg/scheduler/profile/profile.go index 3ac39035ee4..3fc38492bc3 100644 --- a/pkg/scheduler/profile/profile.go +++ b/pkg/scheduler/profile/profile.go @@ -32,7 +32,7 @@ import ( ) // RecorderFactory builds an EventRecorder for a given scheduler name. -type RecorderFactory func(string) events.EventRecorder +type RecorderFactory func(string) events.EventRecorderLogger // newProfile builds a Profile for the given configuration. func newProfile(ctx context.Context, cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, @@ -84,7 +84,7 @@ func (m Map) Close() error { // NewRecorderFactory returns a RecorderFactory for the broadcaster. func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory { - return func(name string) events.EventRecorder { + return func(name string) events.EventRecorderLogger { return b.NewRecorder(scheme.Scheme, name) } } diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index 32dd6117205..c3bf28d2a15 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -296,7 +296,7 @@ func newFakePlugin(name string) func(ctx context.Context, object runtime.Object, } } -func nilRecorderFactory(_ string) events.EventRecorder { +func nilRecorderFactory(_ string) events.EventRecorderLogger { return nil } diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index a49861f7508..c8318322275 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -547,8 +547,9 @@ func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Framework, pod *v1.Pod) bool { // Case 1: pod is being deleted. if pod.DeletionTimestamp != nil { - fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) - klog.FromContext(ctx).V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod)) + logger := klog.FromContext(ctx) + fwk.EventRecorder().WithLogger(logger).Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + logger.V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod)) return true } @@ -1182,7 +1183,7 @@ func (sched *Scheduler) finishBinding(logger klog.Logger, fwk framework.Framewor return } - fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) + fwk.EventRecorder().WithLogger(logger).Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) } func getAttemptsLabel(p *framework.QueuedPodInfo) string { @@ -1281,7 +1282,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, podFwk fram } msg := truncateMessage(errMsg) - podFwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + podFwk.EventRecorder().WithLogger(logger).Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) if err := updatePod(ctx, sched.client, podFwk.APICacher(), pod, &v1.PodCondition{ Type: v1.PodScheduled, ObservedGeneration: podutil.CalculatePodConditionObservedGeneration(&pod.Status, pod.Generation, v1.PodScheduled), diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 8d73c08a396..d6469f9582c 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1794,7 +1794,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) { sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) { return item.mockScheduleResult, item.injectSchedulingError } - sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) { + sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) { gotCallsToFailureHandler++ gotPodIsInFlightAtFailureHandler = podListContainsPod(queue.InFlightPods(), p.Pod) @@ -1802,7 +1802,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) { gotError = status.AsError() msg := truncateMessage(gotError.Error()) - fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + fwk.EventRecorder().WithLogger(klog.FromContext(ctx)).Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) queue.Done(p.Pod.UID) } @@ -4623,7 +4623,7 @@ func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, clien func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Interface, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, apiDispatcher *apidispatcher.APIDispatcher, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...tf.RegisterPluginFunc) (*Scheduler, chan error) { - var recorder events.EventRecorder + var recorder events.EventRecorderLogger if broadcaster != nil { recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName) } else { @@ -4669,12 +4669,12 @@ func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Inte } sched.SchedulePod = sched.schedulePod - sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) { + sched.FailureHandler = func(ctx context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) { err := status.AsError() errChan <- err msg := truncateMessage(err.Error()) - schedFramework.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + schedFramework.EventRecorder().WithLogger(klog.FromContext(ctx)).Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) } return sched, errChan } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 38bf3d75329..88db89f9a0b 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1442,10 +1442,10 @@ func (*emptyEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]fwk.C // fakePermitPlugin only implements PermitPlugin interface. type fakePermitPlugin struct { - eventRecorder events.EventRecorder + eventRecorder events.EventRecorderLogger } -func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory { +func newFakePermitPlugin(eventRecorder events.EventRecorderLogger) frameworkruntime.PluginFactory { return func(ctx context.Context, configuration runtime.Object, f fwk.Handle) (fwk.Plugin, error) { pl := &fakePermitPlugin{ eventRecorder: eventRecorder, @@ -1466,7 +1466,7 @@ const ( func (f fakePermitPlugin) Permit(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { defer func() { // Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage. - f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "") + f.eventRecorder.WithLogger(klog.FromContext(ctx)).Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "") }() return fwk.NewStatus(fwk.Wait), permitTimeout diff --git a/staging/src/k8s.io/kube-scheduler/framework/interface.go b/staging/src/k8s.io/kube-scheduler/framework/interface.go index 52d6b48f9f6..5575c2dd821 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/interface.go +++ b/staging/src/k8s.io/kube-scheduler/framework/interface.go @@ -852,7 +852,7 @@ type Handle interface { KubeConfig() *restclient.Config // EventRecorder returns an event recorder. - EventRecorder() events.EventRecorder + EventRecorder() events.EventRecorderLogger SharedInformerFactory() informers.SharedInformerFactory