Introduce syncWriter to prevent data races when writing from goroutines

Signed-off-by: Maciej Szulik <soltysh@gmail.com>

Kubernetes-commit: 8a8d82ba9d6ea8d870fded3a24df23f358d734f2
This commit is contained in:
Maciej Szulik 2026-05-05 14:22:46 +02:00 committed by Kubernetes Publisher
parent 97f0e109d6
commit ac82c9f2d6

View file

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -268,8 +269,22 @@ func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
return d.deletePods(pods, getPodFn)
}
// syncWriter serializes concurrent Write calls with a mutex.
type syncWriter struct {
mu sync.Mutex
w io.Writer
}
func (s *syncWriter) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.w.Write(p)
}
func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupVersion, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
returnCh := make(chan error, 1)
out := io.Writer(&syncWriter{w: d.Out})
errOut := io.Writer(&syncWriter{w: d.ErrOut})
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if d.Timeout == 0 {
@ -286,13 +301,13 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV
switch d.DryRunStrategy {
case cmdutil.DryRunServer:
//nolint:errcheck
fmt.Fprintf(d.Out, "evicting pod %s/%s (server dry run)\n", activePod.Namespace, activePod.Name)
fmt.Fprintf(out, "evicting pod %s/%s (server dry run)\n", activePod.Namespace, activePod.Name)
default:
if d.OnPodDeletionOrEvictionStarted != nil {
d.OnPodDeletionOrEvictionStarted(&activePod, true)
}
//nolint:errcheck
fmt.Fprintf(d.Out, "evicting pod %s/%s\n", activePod.Namespace, activePod.Name)
fmt.Fprintf(out, "evicting pod %s/%s\n", activePod.Namespace, activePod.Name)
}
select {
case <-ctx.Done():
@ -310,7 +325,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV
return
} else if apierrors.IsTooManyRequests(err) {
//nolint:errcheck
fmt.Fprintf(d.ErrOut, "error when evicting pods/%q -n %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err)
fmt.Fprintf(errOut, "error when evicting pods/%q -n %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err)
time.Sleep(d.EvictErrorRetryDelay)
} else if !activePod.ObjectMeta.DeletionTimestamp.IsZero() && apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
// an eviction request in a deleting namespace will throw a forbidden error,
@ -321,7 +336,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV
// an eviction request in a deleting namespace will throw a forbidden error,
// if the pod is not marked deleted, we retry until it is.
//nolint:errcheck
fmt.Fprintf(d.ErrOut, "error when evicting pod %q from terminating namespace %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err)
fmt.Fprintf(errOut, "error when evicting pod %q from terminating namespace %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err)
time.Sleep(d.EvictErrorRetryDelay)
} else {
returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: %v", activePod.Name, activePod.Namespace, err)
@ -349,7 +364,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV
onFinishFn: d.OnPodDeletionOrEvictionFinished,
globalTimeout: globalTimeout,
skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
out: d.Out,
out: out,
}
_, err := waitForDelete(params)
if err == nil {