diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index 930c46ab077..ac4d63eb8a0 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -196,24 +196,19 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura return nil, err } - cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled) + // cronJobCopy is used to combine all the updates to a + // CronJob object and perform an actual update only once. + cronJobCopy := cronJob.DeepCopy() + + updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) + + requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled) if err != nil { logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", err) - if updateStatus { - if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil { - logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err) - return nil, err - } - } - return nil, err - } - - if jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) { - updateStatus = true } // Update the CronJob if needed - if updateStatus { + if updateStatusAfterCleanup || updateStatusAfterSync { if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil { logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err) return nil, err @@ -225,7 +220,7 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura return requeueAfter, nil } // this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format - return nil, nil + return nil, syncErr } // resolveControllerRef returns the controller referenced by a ControllerRef, @@ -416,15 +411,12 @@ func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr // syncCronJob reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "cronJob" should be included in "jobs". // The current time is passed in to facilitate testing. -// It returns a copy of the CronJob that is to be used by other functions -// that mutates the object -// It also returns a bool to indicate an update to api-server is needed +// It returns a bool to indicate an update to api-server is needed func (jm *ControllerV2) syncCronJob( ctx context.Context, cronJob *batchv1.CronJob, - jobs []*batchv1.Job) (*batchv1.CronJob, *time.Duration, bool, error) { + jobs []*batchv1.Job) (*time.Duration, bool, error) { - cronJob = cronJob.DeepCopy() now := jm.now() updateStatus := false @@ -435,7 +427,7 @@ func (jm *ControllerV2) syncCronJob( if !found && !IsJobFinished(j) { cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name) if err != nil { - return nil, nil, updateStatus, err + return nil, updateStatus, err } if inActiveList(cjCopy, j.ObjectMeta.UID) { cronJob = cjCopy @@ -483,7 +475,7 @@ func (jm *ControllerV2) syncCronJob( deleteFromActiveList(cronJob, j.UID) updateStatus = true case err != nil: - return cronJob, nil, updateStatus, err + return nil, updateStatus, err } // the job is missing in the lister but found in api-server } @@ -491,7 +483,7 @@ func (jm *ControllerV2) syncCronJob( if cronJob.DeletionTimestamp != nil { // The CronJob is being deleted. // Don't do anything other than updating status. - return cronJob, nil, updateStatus, nil + return nil, updateStatus, nil } logger := klog.FromContext(ctx) @@ -500,13 +492,13 @@ func (jm *ControllerV2) syncCronJob( if _, err := time.LoadLocation(timeZone); err != nil { logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err) - return cronJob, nil, updateStatus, nil + return nil, updateStatus, nil } } if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend { logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob)) - return cronJob, nil, updateStatus, nil + return nil, updateStatus, nil } sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder)) @@ -515,7 +507,7 @@ func (jm *ControllerV2) syncCronJob( // we should log the error and not reconcile this cronjob until an update to spec logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err) - return cronJob, nil, updateStatus, nil + return nil, updateStatus, nil } scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder) @@ -524,7 +516,7 @@ func (jm *ControllerV2) syncCronJob( // we should log the error and not reconcile this cronjob until an update to spec logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err) - return cronJob, nil, updateStatus, nil + return nil, updateStatus, nil } if scheduledTime == nil { // no unmet start time, return cj,. @@ -533,7 +525,7 @@ func (jm *ControllerV2) syncCronJob( // the scheduled time, that will give atleast 1 unmet time schedule logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob)) t := nextScheduleTimeDuration(cronJob, now, sched) - return cronJob, t, updateStatus, nil + return t, updateStatus, nil } tooLate := false @@ -552,7 +544,7 @@ func (jm *ControllerV2) syncCronJob( // and event the next time we process it, and also so the user looking at the status // can see easily that there was a missed execution. t := nextScheduleTimeDuration(cronJob, now, sched) - return cronJob, t, updateStatus, nil + return t, updateStatus, nil } if inActiveListByName(cronJob, &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -561,7 +553,7 @@ func (jm *ControllerV2) syncCronJob( }}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) { logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime) t := nextScheduleTimeDuration(cronJob, now, sched) - return cronJob, t, updateStatus, nil + return t, updateStatus, nil } if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 { // Regardless which source of information we use for the set of active jobs, @@ -576,7 +568,7 @@ func (jm *ControllerV2) syncCronJob( logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob)) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid") t := nextScheduleTimeDuration(cronJob, now, sched) - return cronJob, t, updateStatus, nil + return t, updateStatus, nil } if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { for _, j := range cronJob.Status.Active { @@ -584,10 +576,10 @@ func (jm *ControllerV2) syncCronJob( job, err := jm.jobControl.GetJob(j.Namespace, j.Name) if err != nil { jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err) - return cronJob, nil, updateStatus, err + return nil, updateStatus, err } if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) { - return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name) + return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name) } updateStatus = true } @@ -596,19 +588,22 @@ func (jm *ControllerV2) syncCronJob( jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime) if err != nil { logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob)) - return cronJob, nil, updateStatus, err + return nil, updateStatus, err } jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq) switch { case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause): + // if the namespace is being terminated, we don't have to do + // anything because any creation will fail + return nil, updateStatus, err case errors.IsAlreadyExists(err): // If the job is created by other actor, assume it has updated the cronjob status accordingly logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq)) - return cronJob, nil, updateStatus, err + return nil, updateStatus, err case err != nil: // default error handling jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) - return cronJob, nil, updateStatus, err + return nil, updateStatus, err } metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds()) @@ -629,14 +624,14 @@ func (jm *ControllerV2) syncCronJob( jobRef, err := getRef(jobResp) if err != nil { logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err) - return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob)) + return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob)) } cronJob.Status.Active = append(cronJob.Status.Active, *jobRef) cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime} updateStatus = true t := nextScheduleTimeDuration(cronJob, now, sched) - return cronJob, t, updateStatus, nil + return t, updateStatus, nil } func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string { diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go index 1c7864984e8..c798e0926ec 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2_test.go +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -19,6 +19,7 @@ package cronjob import ( "context" "reflect" + "sort" "strings" "testing" "time" @@ -35,6 +36,9 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2/ktesting" + "k8s.io/utils/pointer" + + "fmt" _ "k8s.io/kubernetes/pkg/apis/batch/install" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" @@ -1237,7 +1241,8 @@ func TestControllerV2SyncCronJob(t *testing.T) { return tc.now }, } - cjCopy, requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), &cj, js) + cjCopy := cj.DeepCopy() + requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), cjCopy, js) if tc.expectErr && err == nil { t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter) } @@ -1670,3 +1675,120 @@ func TestControllerV2GetJobsToBeReconciled(t *testing.T) { }) } } + +func TestControllerV2CleanupFinishedJobs(t *testing.T) { + tests := []struct { + name string + now time.Time + cronJob *batchv1.CronJob + finishedJobs []*batchv1.Job + jobCreateError error + expectedDeletedJobs []string + }{ + { + name: "jobs are still deleted when a cronjob can't create jobs due to jobs quota being reached (avoiding a deadlock)", + now: *justAfterTheHour(), + cronJob: &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}, + Spec: batchv1.CronJobSpec{ + Schedule: onTheHour, + SuccessfulJobsHistoryLimit: pointer.Int32(1), + JobTemplate: batchv1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}}, + }, + }, + Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}}, + }, + finishedJobs: []*batchv1.Job{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo-ns", + Name: "finished-job-started-hour-ago", + OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}}, + }, + Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo-ns", + Name: "finished-job-started-minute-ago", + OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}}, + }, + Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeTheHour()}}, + }, + }, + jobCreateError: errors.NewInternalError(fmt.Errorf("quota for # of jobs reached")), + expectedDeletedJobs: []string{"finished-job-started-hour-ago"}, + }, + { + name: "jobs are not deleted if history limit not reached", + now: justBeforeTheHour(), + cronJob: &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}, + Spec: batchv1.CronJobSpec{ + Schedule: onTheHour, + SuccessfulJobsHistoryLimit: pointer.Int32(2), + JobTemplate: batchv1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}}, + }, + }, + Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}}, + }, + finishedJobs: []*batchv1.Job{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo-ns", + Name: "finished-job-started-hour-ago", + OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}}, + }, + Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}}, + }, + }, + jobCreateError: nil, + expectedDeletedJobs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + for _, job := range tt.finishedJobs { + job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: v1.ConditionTrue}} + } + + client := fake.NewSimpleClientset() + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + _ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(tt.cronJob) + for _, job := range tt.finishedJobs { + _ = informerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + } + + jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client) + if err != nil { + t.Errorf("unexpected error %v", err) + return + } + jobControl := &fakeJobControl{CreateErr: tt.jobCreateError} + jm.jobControl = jobControl + jm.now = func() time.Time { + return tt.now + } + + jm.enqueueController(tt.cronJob) + jm.processNextWorkItem(ctx) + + if len(tt.expectedDeletedJobs) != len(jobControl.DeleteJobName) { + t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName) + } + sort.Strings(jobControl.DeleteJobName) + sort.Strings(tt.expectedDeletedJobs) + for i, deletedJob := range jobControl.DeleteJobName { + if deletedJob != tt.expectedDeletedJobs[i] { + t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName) + } + } + }) + } +}