mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
* Fix a job quota related deadlock
In case ResourceQuota is used and sets a max # of jobs, a CronJob may get
trapped in a deadlock:
1. Job quota for a namespace is reached.
2. CronJob controller can't create a new job, because quota is
reached.
3. Cleanup of jobs owned by a cronjob doesn't happen, because a
control loop iteration is finished because of an error to create a
job.
To fix this we stop early quitting from a control loop iteration when
cronjob reconciliation failed and always let old jobs to be cleaned up.
* Dont reorder imports
* Don't stop requeuing on reconciliation error
Previous code only logged the reconciliation error inside jm.sync() and
didn't return the reconciliation error to it's invoker
processNextWorkItem().
Adding a copy-paste back to avoid this issue.
* Remove copy-pasted cleanupFinishedJobs()
Now we always call jm.cleanupFinishedJobs() first and then
jm.syncCronJob().
We also extract cronJobCopy and updateStatus outside jm.syncCronJob
function and pass pointers to them in both jm.syncCronJob and
jm.cleanupFinishedJobs to make delayed updates handling more explicit
and not dependent on the order in which cleanupFinishedJobs and
syncCronJob are invoked.
* Return updateStatus bool instead of changing the reference
* Explicitly ignore err in tests to fix linter
* Fix formatting with update-gofmt.sh
This commit is contained in:
parent
32842f1d00
commit
c0d2ca7bb6
2 changed files with 155 additions and 38 deletions
|
|
@ -196,24 +196,19 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
|
|||
return nil, err
|
||||
}
|
||||
|
||||
cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
|
||||
// cronJobCopy is used to combine all the updates to a
|
||||
// CronJob object and perform an actual update only once.
|
||||
cronJobCopy := cronJob.DeepCopy()
|
||||
|
||||
updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
|
||||
|
||||
requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
|
||||
if err != nil {
|
||||
logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", err)
|
||||
if updateStatus {
|
||||
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
|
||||
logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) {
|
||||
updateStatus = true
|
||||
}
|
||||
|
||||
// Update the CronJob if needed
|
||||
if updateStatus {
|
||||
if updateStatusAfterCleanup || updateStatusAfterSync {
|
||||
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
|
||||
logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
|
||||
return nil, err
|
||||
|
|
@ -225,7 +220,7 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
|
|||
return requeueAfter, nil
|
||||
}
|
||||
// this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
|
||||
return nil, nil
|
||||
return nil, syncErr
|
||||
}
|
||||
|
||||
// resolveControllerRef returns the controller referenced by a ControllerRef,
|
||||
|
|
@ -416,15 +411,12 @@ func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr
|
|||
// syncCronJob reconciles a CronJob with a list of any Jobs that it created.
|
||||
// All known jobs created by "cronJob" should be included in "jobs".
|
||||
// The current time is passed in to facilitate testing.
|
||||
// It returns a copy of the CronJob that is to be used by other functions
|
||||
// that mutates the object
|
||||
// It also returns a bool to indicate an update to api-server is needed
|
||||
// It returns a bool to indicate an update to api-server is needed
|
||||
func (jm *ControllerV2) syncCronJob(
|
||||
ctx context.Context,
|
||||
cronJob *batchv1.CronJob,
|
||||
jobs []*batchv1.Job) (*batchv1.CronJob, *time.Duration, bool, error) {
|
||||
jobs []*batchv1.Job) (*time.Duration, bool, error) {
|
||||
|
||||
cronJob = cronJob.DeepCopy()
|
||||
now := jm.now()
|
||||
updateStatus := false
|
||||
|
||||
|
|
@ -435,7 +427,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
if !found && !IsJobFinished(j) {
|
||||
cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
|
||||
if err != nil {
|
||||
return nil, nil, updateStatus, err
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
if inActiveList(cjCopy, j.ObjectMeta.UID) {
|
||||
cronJob = cjCopy
|
||||
|
|
@ -483,7 +475,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
deleteFromActiveList(cronJob, j.UID)
|
||||
updateStatus = true
|
||||
case err != nil:
|
||||
return cronJob, nil, updateStatus, err
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
// the job is missing in the lister but found in api-server
|
||||
}
|
||||
|
|
@ -491,7 +483,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
if cronJob.DeletionTimestamp != nil {
|
||||
// The CronJob is being deleted.
|
||||
// Don't do anything other than updating status.
|
||||
return cronJob, nil, updateStatus, nil
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
|
|
@ -500,13 +492,13 @@ func (jm *ControllerV2) syncCronJob(
|
|||
if _, err := time.LoadLocation(timeZone); err != nil {
|
||||
logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err)
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
|
||||
return cronJob, nil, updateStatus, nil
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
}
|
||||
|
||||
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
|
||||
logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob))
|
||||
return cronJob, nil, updateStatus, nil
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
|
||||
sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
|
||||
|
|
@ -515,7 +507,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
// we should log the error and not reconcile this cronjob until an update to spec
|
||||
logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
|
||||
return cronJob, nil, updateStatus, nil
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
|
||||
scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
|
||||
|
|
@ -524,7 +516,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
// we should log the error and not reconcile this cronjob until an update to spec
|
||||
logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
|
||||
return cronJob, nil, updateStatus, nil
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
if scheduledTime == nil {
|
||||
// no unmet start time, return cj,.
|
||||
|
|
@ -533,7 +525,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
// the scheduled time, that will give atleast 1 unmet time schedule
|
||||
logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob))
|
||||
t := nextScheduleTimeDuration(cronJob, now, sched)
|
||||
return cronJob, t, updateStatus, nil
|
||||
return t, updateStatus, nil
|
||||
}
|
||||
|
||||
tooLate := false
|
||||
|
|
@ -552,7 +544,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
// and event the next time we process it, and also so the user looking at the status
|
||||
// can see easily that there was a missed execution.
|
||||
t := nextScheduleTimeDuration(cronJob, now, sched)
|
||||
return cronJob, t, updateStatus, nil
|
||||
return t, updateStatus, nil
|
||||
}
|
||||
if inActiveListByName(cronJob, &batchv1.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
@ -561,7 +553,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
}}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
|
||||
logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime)
|
||||
t := nextScheduleTimeDuration(cronJob, now, sched)
|
||||
return cronJob, t, updateStatus, nil
|
||||
return t, updateStatus, nil
|
||||
}
|
||||
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
|
||||
// Regardless which source of information we use for the set of active jobs,
|
||||
|
|
@ -576,7 +568,7 @@ func (jm *ControllerV2) syncCronJob(
|
|||
logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob))
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
|
||||
t := nextScheduleTimeDuration(cronJob, now, sched)
|
||||
return cronJob, t, updateStatus, nil
|
||||
return t, updateStatus, nil
|
||||
}
|
||||
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
|
||||
for _, j := range cronJob.Status.Active {
|
||||
|
|
@ -584,10 +576,10 @@ func (jm *ControllerV2) syncCronJob(
|
|||
job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
|
||||
if err != nil {
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
|
||||
return cronJob, nil, updateStatus, err
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) {
|
||||
return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
|
||||
return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
|
||||
}
|
||||
updateStatus = true
|
||||
}
|
||||
|
|
@ -596,19 +588,22 @@ func (jm *ControllerV2) syncCronJob(
|
|||
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
|
||||
if err != nil {
|
||||
logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
|
||||
return cronJob, nil, updateStatus, err
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
|
||||
switch {
|
||||
case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
|
||||
// if the namespace is being terminated, we don't have to do
|
||||
// anything because any creation will fail
|
||||
return nil, updateStatus, err
|
||||
case errors.IsAlreadyExists(err):
|
||||
// If the job is created by other actor, assume it has updated the cronjob status accordingly
|
||||
logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
|
||||
return cronJob, nil, updateStatus, err
|
||||
return nil, updateStatus, err
|
||||
case err != nil:
|
||||
// default error handling
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
|
||||
return cronJob, nil, updateStatus, err
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
|
||||
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
|
||||
|
|
@ -629,14 +624,14 @@ func (jm *ControllerV2) syncCronJob(
|
|||
jobRef, err := getRef(jobResp)
|
||||
if err != nil {
|
||||
logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err)
|
||||
return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
|
||||
return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
|
||||
}
|
||||
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
|
||||
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
|
||||
updateStatus = true
|
||||
|
||||
t := nextScheduleTimeDuration(cronJob, now, sched)
|
||||
return cronJob, t, updateStatus, nil
|
||||
return t, updateStatus, nil
|
||||
}
|
||||
|
||||
func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package cronjob
|
|||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -35,6 +36,9 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/utils/pointer"
|
||||
|
||||
"fmt"
|
||||
_ "k8s.io/kubernetes/pkg/apis/batch/install"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
|
|
@ -1237,7 +1241,8 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
|||
return tc.now
|
||||
},
|
||||
}
|
||||
cjCopy, requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), &cj, js)
|
||||
cjCopy := cj.DeepCopy()
|
||||
requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), cjCopy, js)
|
||||
if tc.expectErr && err == nil {
|
||||
t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter)
|
||||
}
|
||||
|
|
@ -1670,3 +1675,120 @@ func TestControllerV2GetJobsToBeReconciled(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerV2CleanupFinishedJobs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
now time.Time
|
||||
cronJob *batchv1.CronJob
|
||||
finishedJobs []*batchv1.Job
|
||||
jobCreateError error
|
||||
expectedDeletedJobs []string
|
||||
}{
|
||||
{
|
||||
name: "jobs are still deleted when a cronjob can't create jobs due to jobs quota being reached (avoiding a deadlock)",
|
||||
now: *justAfterTheHour(),
|
||||
cronJob: &batchv1.CronJob{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"},
|
||||
Spec: batchv1.CronJobSpec{
|
||||
Schedule: onTheHour,
|
||||
SuccessfulJobsHistoryLimit: pointer.Int32(1),
|
||||
JobTemplate: batchv1.JobTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}},
|
||||
},
|
||||
},
|
||||
Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}},
|
||||
},
|
||||
finishedJobs: []*batchv1.Job{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "foo-ns",
|
||||
Name: "finished-job-started-hour-ago",
|
||||
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
|
||||
},
|
||||
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "foo-ns",
|
||||
Name: "finished-job-started-minute-ago",
|
||||
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
|
||||
},
|
||||
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeTheHour()}},
|
||||
},
|
||||
},
|
||||
jobCreateError: errors.NewInternalError(fmt.Errorf("quota for # of jobs reached")),
|
||||
expectedDeletedJobs: []string{"finished-job-started-hour-ago"},
|
||||
},
|
||||
{
|
||||
name: "jobs are not deleted if history limit not reached",
|
||||
now: justBeforeTheHour(),
|
||||
cronJob: &batchv1.CronJob{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"},
|
||||
Spec: batchv1.CronJobSpec{
|
||||
Schedule: onTheHour,
|
||||
SuccessfulJobsHistoryLimit: pointer.Int32(2),
|
||||
JobTemplate: batchv1.JobTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}},
|
||||
},
|
||||
},
|
||||
Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}},
|
||||
},
|
||||
finishedJobs: []*batchv1.Job{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "foo-ns",
|
||||
Name: "finished-job-started-hour-ago",
|
||||
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
|
||||
},
|
||||
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}},
|
||||
},
|
||||
},
|
||||
jobCreateError: nil,
|
||||
expectedDeletedJobs: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
for _, job := range tt.finishedJobs {
|
||||
job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: v1.ConditionTrue}}
|
||||
}
|
||||
|
||||
client := fake.NewSimpleClientset()
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(tt.cronJob)
|
||||
for _, job := range tt.finishedJobs {
|
||||
_ = informerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||
}
|
||||
|
||||
jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
return
|
||||
}
|
||||
jobControl := &fakeJobControl{CreateErr: tt.jobCreateError}
|
||||
jm.jobControl = jobControl
|
||||
jm.now = func() time.Time {
|
||||
return tt.now
|
||||
}
|
||||
|
||||
jm.enqueueController(tt.cronJob)
|
||||
jm.processNextWorkItem(ctx)
|
||||
|
||||
if len(tt.expectedDeletedJobs) != len(jobControl.DeleteJobName) {
|
||||
t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName)
|
||||
}
|
||||
sort.Strings(jobControl.DeleteJobName)
|
||||
sort.Strings(tt.expectedDeletedJobs)
|
||||
for i, deletedJob := range jobControl.DeleteJobName {
|
||||
if deletedJob != tt.expectedDeletedJobs[i] {
|
||||
t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue