Merge pull request #137015 from nmn3m/contextual-logging-scheduler-events

scheduler: use contextual logging for event emission
This commit is contained in:
Kubernetes Prow Robot 2026-03-20 03:12:31 +05:30 committed by GitHub
commit 734aeb836d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 24 additions and 23 deletions

View file

@ -408,7 +408,7 @@ func newEndpointsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration,
}
func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
return func(name string) events.EventRecorder {
return func(name string) events.EventRecorderLogger {
return cc.EventBroadcaster.NewRecorder(name)
}
}

View file

@ -1183,7 +1183,7 @@ func (pl *DynamicResources) PreBind(ctx context.Context, cs fwk.CycleState, pod
}
// We need to wait for the device to be attached to the node.
pl.fh.EventRecorder().Eventf(pod, nil, v1.EventTypeNormal, "BindingConditionsPending", "Scheduling", "waiting for binding conditions for device on node %s", nodeName)
pl.fh.EventRecorder().WithLogger(logger).Eventf(pod, nil, v1.EventTypeNormal, "BindingConditionsPending", "Scheduling", "waiting for binding conditions for device on node %s", nodeName)
// START: Record start time for metrics duration calculation
start := time.Now()
err = wait.PollUntilContextTimeout(ctx, 5*time.Second, pl.bindingTimeout, true,

View file

@ -127,7 +127,7 @@ func newExecutor(fh fwk.Handle) *Executor {
eventMessage += " (in kube-scheduler memory)."
}
fh.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", eventMessage)
fh.EventRecorder().WithLogger(logger).Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", eventMessage)
return nil
}

View file

@ -84,7 +84,7 @@ type frameworkImpl struct {
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
eventRecorder events.EventRecorderLogger
informerFactory informers.SharedInformerFactory
sharedDRAManager fwk.SharedDRAManager
podGroupManager fwk.PodGroupManager
@ -148,7 +148,7 @@ type frameworkOptions struct {
componentConfigVersion string
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
eventRecorder events.EventRecorderLogger
informerFactory informers.SharedInformerFactory
sharedDRAManager fwk.SharedDRAManager
sharedCSIManager fwk.CSIManager
@ -194,7 +194,7 @@ func WithKubeConfig(kubeConfig *restclient.Config) Option {
}
// WithEventRecorder sets clientSet for the scheduling frameworkImpl.
func WithEventRecorder(recorder events.EventRecorder) Option {
func WithEventRecorder(recorder events.EventRecorderLogger) Option {
return func(o *frameworkOptions) {
o.eventRecorder = recorder
}
@ -2159,7 +2159,7 @@ func (f *frameworkImpl) KubeConfig() *restclient.Config {
}
// EventRecorder returns an event recorder.
func (f *frameworkImpl) EventRecorder() events.EventRecorder {
func (f *frameworkImpl) EventRecorder() events.EventRecorderLogger {
return f.eventRecorder
}

View file

@ -32,7 +32,7 @@ import (
)
// RecorderFactory builds an EventRecorder for a given scheduler name.
type RecorderFactory func(string) events.EventRecorder
type RecorderFactory func(string) events.EventRecorderLogger
// newProfile builds a Profile for the given configuration.
func newProfile(ctx context.Context, cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
@ -84,7 +84,7 @@ func (m Map) Close() error {
// NewRecorderFactory returns a RecorderFactory for the broadcaster.
func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory {
return func(name string) events.EventRecorder {
return func(name string) events.EventRecorderLogger {
return b.NewRecorder(scheme.Scheme, name)
}
}

View file

@ -296,7 +296,7 @@ func newFakePlugin(name string) func(ctx context.Context, object runtime.Object,
}
}
func nilRecorderFactory(_ string) events.EventRecorder {
func nilRecorderFactory(_ string) events.EventRecorderLogger {
return nil
}

View file

@ -547,8 +547,9 @@ func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error
func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Framework, pod *v1.Pod) bool {
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.FromContext(ctx).V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod))
logger := klog.FromContext(ctx)
fwk.EventRecorder().WithLogger(logger).Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
logger.V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod))
return true
}
@ -1182,7 +1183,7 @@ func (sched *Scheduler) finishBinding(logger klog.Logger, fwk framework.Framewor
return
}
fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
fwk.EventRecorder().WithLogger(logger).Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}
func getAttemptsLabel(p *framework.QueuedPodInfo) string {
@ -1281,7 +1282,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, podFwk fram
}
msg := truncateMessage(errMsg)
podFwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
podFwk.EventRecorder().WithLogger(logger).Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
if err := updatePod(ctx, sched.client, podFwk.APICacher(), pod, &v1.PodCondition{
Type: v1.PodScheduled,
ObservedGeneration: podutil.CalculatePodConditionObservedGeneration(&pod.Status, pod.Generation, v1.PodScheduled),

View file

@ -1794,7 +1794,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) {
return item.mockScheduleResult, item.injectSchedulingError
}
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
gotCallsToFailureHandler++
gotPodIsInFlightAtFailureHandler = podListContainsPod(queue.InFlightPods(), p.Pod)
@ -1802,7 +1802,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
gotError = status.AsError()
msg := truncateMessage(gotError.Error())
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
fwk.EventRecorder().WithLogger(klog.FromContext(ctx)).Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
queue.Done(p.Pod.UID)
}
@ -4623,7 +4623,7 @@ func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, clien
func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Interface, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, apiDispatcher *apidispatcher.APIDispatcher,
informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...tf.RegisterPluginFunc) (*Scheduler, chan error) {
var recorder events.EventRecorder
var recorder events.EventRecorderLogger
if broadcaster != nil {
recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
} else {
@ -4669,12 +4669,12 @@ func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Inte
}
sched.SchedulePod = sched.schedulePod
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
sched.FailureHandler = func(ctx context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
err := status.AsError()
errChan <- err
msg := truncateMessage(err.Error())
schedFramework.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
schedFramework.EventRecorder().WithLogger(klog.FromContext(ctx)).Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
}
return sched, errChan
}

View file

@ -1442,10 +1442,10 @@ func (*emptyEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]fwk.C
// fakePermitPlugin only implements PermitPlugin interface.
type fakePermitPlugin struct {
eventRecorder events.EventRecorder
eventRecorder events.EventRecorderLogger
}
func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory {
func newFakePermitPlugin(eventRecorder events.EventRecorderLogger) frameworkruntime.PluginFactory {
return func(ctx context.Context, configuration runtime.Object, f fwk.Handle) (fwk.Plugin, error) {
pl := &fakePermitPlugin{
eventRecorder: eventRecorder,
@ -1466,7 +1466,7 @@ const (
func (f fakePermitPlugin) Permit(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
defer func() {
// Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage.
f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "")
f.eventRecorder.WithLogger(klog.FromContext(ctx)).Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "")
}()
return fwk.NewStatus(fwk.Wait), permitTimeout

View file

@ -852,7 +852,7 @@ type Handle interface {
KubeConfig() *restclient.Config
// EventRecorder returns an event recorder.
EventRecorder() events.EventRecorder
EventRecorder() events.EventRecorderLogger
SharedInformerFactory() informers.SharedInformerFactory