diff --git a/pkg/drain/drain.go b/pkg/drain/drain.go index 9f2e36b37..c7a397e11 100644 --- a/pkg/drain/drain.go +++ b/pkg/drain/drain.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -268,8 +269,22 @@ func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error { return d.deletePods(pods, getPodFn) } +// syncWriter serializes concurrent Write calls with a mutex. +type syncWriter struct { + mu sync.Mutex + w io.Writer +} + +func (s *syncWriter) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.w.Write(p) +} + func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupVersion, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { returnCh := make(chan error, 1) + out := io.Writer(&syncWriter{w: d.Out}) + errOut := io.Writer(&syncWriter{w: d.ErrOut}) // 0 timeout means infinite, we use MaxInt64 to represent it. var globalTimeout time.Duration if d.Timeout == 0 { @@ -286,13 +301,13 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV switch d.DryRunStrategy { case cmdutil.DryRunServer: //nolint:errcheck - fmt.Fprintf(d.Out, "evicting pod %s/%s (server dry run)\n", activePod.Namespace, activePod.Name) + fmt.Fprintf(out, "evicting pod %s/%s (server dry run)\n", activePod.Namespace, activePod.Name) default: if d.OnPodDeletionOrEvictionStarted != nil { d.OnPodDeletionOrEvictionStarted(&activePod, true) } //nolint:errcheck - fmt.Fprintf(d.Out, "evicting pod %s/%s\n", activePod.Namespace, activePod.Name) + fmt.Fprintf(out, "evicting pod %s/%s\n", activePod.Namespace, activePod.Name) } select { case <-ctx.Done(): @@ -310,7 +325,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV return } else if apierrors.IsTooManyRequests(err) { //nolint:errcheck - fmt.Fprintf(d.ErrOut, "error when evicting pods/%q -n %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err) + fmt.Fprintf(errOut, "error when evicting pods/%q -n %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err) time.Sleep(d.EvictErrorRetryDelay) } else if !activePod.ObjectMeta.DeletionTimestamp.IsZero() && apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { // an eviction request in a deleting namespace will throw a forbidden error, @@ -321,7 +336,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV // an eviction request in a deleting namespace will throw a forbidden error, // if the pod is not marked deleted, we retry until it is. //nolint:errcheck - fmt.Fprintf(d.ErrOut, "error when evicting pod %q from terminating namespace %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err) + fmt.Fprintf(errOut, "error when evicting pod %q from terminating namespace %q (will retry after %v): %v\n", activePod.Name, activePod.Namespace, d.EvictErrorRetryDelay, err) time.Sleep(d.EvictErrorRetryDelay) } else { returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: %v", activePod.Name, activePod.Namespace, err) @@ -349,7 +364,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupV onFinishFn: d.OnPodDeletionOrEvictionFinished, globalTimeout: globalTimeout, skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds, - out: d.Out, + out: out, } _, err := waitForDelete(params) if err == nil {