2015-08-27 08:19:35 -04:00
/ *
Copyright 2015 The Kubernetes Authors All rights reserved .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package job
import (
"reflect"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
2015-09-17 18:21:55 -04:00
"k8s.io/kubernetes/pkg/api/unversioned"
2015-10-09 18:04:41 -04:00
"k8s.io/kubernetes/pkg/apis/extensions"
2015-08-27 08:19:35 -04:00
"k8s.io/kubernetes/pkg/client/cache"
2016-02-05 16:58:03 -05:00
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2015-08-27 08:19:35 -04:00
"k8s.io/kubernetes/pkg/client/record"
2016-02-16 12:54:53 -05:00
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
2015-08-27 08:19:35 -04:00
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
2015-10-09 23:58:57 -04:00
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
2015-08-27 08:19:35 -04:00
"k8s.io/kubernetes/pkg/runtime"
2016-01-15 02:32:10 -05:00
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
2016-02-02 05:57:06 -05:00
"k8s.io/kubernetes/pkg/util/wait"
2015-08-27 08:19:35 -04:00
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
2015-09-17 22:16:04 -04:00
type JobController struct {
2016-01-15 00:00:58 -05:00
kubeClient clientset . Interface
2015-08-27 08:19:35 -04:00
podControl controller . PodControlInterface
2015-09-22 04:05:54 -04:00
// To allow injection of updateJobStatus for testing.
2015-10-09 18:49:10 -04:00
updateHandler func ( job * extensions . Job ) error
2015-08-27 08:19:35 -04:00
syncHandler func ( jobKey string ) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func ( ) bool
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller . ControllerExpectationsInterface
// A store of job, populated by the jobController
jobStore cache . StoreToJobLister
// Watches changes to all jobs
jobController * framework . Controller
// A store of pods, populated by the podController
podStore cache . StoreToPodLister
// Watches changes to all pods
podController * framework . Controller
// Jobs that need to be updated
queue * workqueue . Type
2015-11-26 10:54:04 -05:00
recorder record . EventRecorder
2015-08-27 08:19:35 -04:00
}
2016-01-15 00:00:58 -05:00
func NewJobController ( kubeClient clientset . Interface , resyncPeriod controller . ResyncPeriodFunc ) * JobController {
2015-08-27 08:19:35 -04:00
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
2016-01-15 00:00:58 -05:00
// TODO: remove the wrapper when every clients have moved to use the clientset.
2016-02-16 12:54:53 -05:00
eventBroadcaster . StartRecordingToSink ( & unversionedcore . EventSinkImpl { kubeClient . Core ( ) . Events ( "" ) } )
2015-08-27 08:19:35 -04:00
2015-09-17 22:16:04 -04:00
jm := & JobController {
2015-08-27 08:19:35 -04:00
kubeClient : kubeClient ,
podControl : controller . RealPodControl {
KubeClient : kubeClient ,
2015-11-26 10:54:04 -05:00
Recorder : eventBroadcaster . NewRecorder ( api . EventSource { Component : "job-controller" } ) ,
2015-08-27 08:19:35 -04:00
} ,
expectations : controller . NewControllerExpectations ( ) ,
queue : workqueue . New ( ) ,
2015-11-26 10:54:04 -05:00
recorder : eventBroadcaster . NewRecorder ( api . EventSource { Component : "job-controller" } ) ,
2015-08-27 08:19:35 -04:00
}
jm . jobStore . Store , jm . jobController = framework . NewInformer (
& cache . ListWatch {
2015-12-10 04:39:03 -05:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2015-12-02 10:20:48 -05:00
return jm . kubeClient . Extensions ( ) . Jobs ( api . NamespaceAll ) . List ( options )
2015-08-27 08:19:35 -04:00
} ,
2015-12-10 04:39:03 -05:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2015-11-26 10:27:45 -05:00
return jm . kubeClient . Extensions ( ) . Jobs ( api . NamespaceAll ) . Watch ( options )
2015-08-27 08:19:35 -04:00
} ,
} ,
2015-10-09 18:49:10 -04:00
& extensions . Job { } ,
2015-10-06 05:12:00 -04:00
// TODO: Can we have much longer period here?
2015-08-27 08:19:35 -04:00
replicationcontroller . FullControllerResyncPeriod ,
framework . ResourceEventHandlerFuncs {
AddFunc : jm . enqueueController ,
UpdateFunc : func ( old , cur interface { } ) {
2015-10-09 18:49:10 -04:00
if job := cur . ( * extensions . Job ) ; ! isJobFinished ( job ) {
2015-10-01 17:35:58 -04:00
jm . enqueueController ( job )
2015-08-27 08:19:35 -04:00
}
} ,
DeleteFunc : jm . enqueueController ,
} ,
)
jm . podStore . Store , jm . podController = framework . NewInformer (
& cache . ListWatch {
2015-12-10 04:39:03 -05:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2016-02-03 16:21:05 -05:00
return jm . kubeClient . Core ( ) . Pods ( api . NamespaceAll ) . List ( options )
2015-08-27 08:19:35 -04:00
} ,
2015-12-10 04:39:03 -05:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2016-02-03 16:21:05 -05:00
return jm . kubeClient . Core ( ) . Pods ( api . NamespaceAll ) . Watch ( options )
2015-08-27 08:19:35 -04:00
} ,
} ,
& api . Pod { } ,
2015-10-06 05:12:00 -04:00
resyncPeriod ( ) ,
2015-08-27 08:19:35 -04:00
framework . ResourceEventHandlerFuncs {
AddFunc : jm . addPod ,
UpdateFunc : jm . updatePod ,
DeleteFunc : jm . deletePod ,
} ,
)
2015-09-22 04:05:54 -04:00
jm . updateHandler = jm . updateJobStatus
2015-08-27 08:19:35 -04:00
jm . syncHandler = jm . syncJob
jm . podStoreSynced = jm . podController . HasSynced
return jm
}
// Run the main goroutine responsible for watching and syncing jobs.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 02:32:10 -05:00
defer utilruntime . HandleCrash ( )
2015-08-27 08:19:35 -04:00
go jm . jobController . Run ( stopCh )
go jm . podController . Run ( stopCh )
for i := 0 ; i < workers ; i ++ {
2016-02-02 05:57:06 -05:00
go wait . Until ( jm . worker , time . Second , stopCh )
2015-08-27 08:19:35 -04:00
}
<- stopCh
glog . Infof ( "Shutting down Job Manager" )
jm . queue . ShutDown ( )
}
// getPodJob returns the job managing the given pod.
2015-10-09 18:49:10 -04:00
func ( jm * JobController ) getPodJob ( pod * api . Pod ) * extensions . Job {
2015-08-27 08:19:35 -04:00
jobs , err := jm . jobStore . GetPodJobs ( pod )
if err != nil {
2015-09-17 22:16:04 -04:00
glog . V ( 4 ) . Infof ( "No jobs found for pod %v, job controller will avoid syncing" , pod . Name )
2015-08-27 08:19:35 -04:00
return nil
}
2015-09-18 15:16:38 -04:00
if len ( jobs ) > 1 {
glog . Errorf ( "user error! more than one job is selecting pods with labels: %+v" , pod . Labels )
sort . Sort ( byCreationTimestamp ( jobs ) )
}
2015-08-27 08:19:35 -04:00
return & jobs [ 0 ]
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) addPod ( obj interface { } ) {
2015-08-27 08:19:35 -04:00
pod := obj . ( * api . Pod )
if pod . DeletionTimestamp != nil {
2015-09-17 22:16:04 -04:00
// on a restart of the controller controller, it's possible a new pod shows up in a state that
2015-08-27 08:19:35 -04:00
// is already pending deletion. Prevent the pod from being a creation observation.
jm . deletePod ( pod )
return
}
if job := jm . getPodJob ( pod ) ; job != nil {
jobKey , err := controller . KeyFunc ( job )
if err != nil {
glog . Errorf ( "Couldn't get key for job %#v: %v" , job , err )
return
}
jm . expectations . CreationObserved ( jobKey )
jm . enqueueController ( job )
}
}
// When a pod is updated, figure out what job/s manage it and wake them up.
// If the labels of the pod have changed we need to awaken both the old
// and new job. old and cur must be *api.Pod types.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) updatePod ( old , cur interface { } ) {
2015-08-27 08:19:35 -04:00
if api . Semantic . DeepEqual ( old , cur ) {
// A periodic relist will send update events for all known pods.
return
}
curPod := cur . ( * api . Pod )
if curPod . DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an job to create more pods asap, not wait
// until the kubelet actually deletes the pod.
jm . deletePod ( curPod )
return
}
if job := jm . getPodJob ( curPod ) ; job != nil {
jm . enqueueController ( job )
}
oldPod := old . ( * api . Pod )
// Only need to get the old job if the labels changed.
if ! reflect . DeepEqual ( curPod . Labels , oldPod . Labels ) {
// If the old and new job are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldJob := jm . getPodJob ( oldPod ) ; oldJob != nil {
jm . enqueueController ( oldJob )
}
}
}
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) deletePod ( obj interface { } ) {
2015-08-27 08:19:35 -04:00
pod , ok := obj . ( * api . Pod )
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new job will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2016-02-24 17:01:48 -05:00
glog . Errorf ( "Couldn't get object from tombstone %+v" , obj )
2015-08-27 08:19:35 -04:00
return
}
pod , ok = tombstone . Obj . ( * api . Pod )
if ! ok {
2016-02-24 17:01:48 -05:00
glog . Errorf ( "Tombstone contained object that is not a pod %+v" , obj )
2015-08-27 08:19:35 -04:00
return
}
}
if job := jm . getPodJob ( pod ) ; job != nil {
jobKey , err := controller . KeyFunc ( job )
if err != nil {
glog . Errorf ( "Couldn't get key for job %#v: %v" , job , err )
return
}
jm . expectations . DeletionObserved ( jobKey )
jm . enqueueController ( job )
}
}
2015-10-09 18:49:10 -04:00
// obj could be an *extensions.Job, or a DeletionFinalStateUnknown marker item.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) enqueueController ( obj interface { } ) {
2015-08-27 08:19:35 -04:00
key , err := controller . KeyFunc ( obj )
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
jm . queue . Add ( key )
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) worker ( ) {
2015-08-27 08:19:35 -04:00
for {
func ( ) {
key , quit := jm . queue . Get ( )
if quit {
return
}
defer jm . queue . Done ( key )
err := jm . syncHandler ( key . ( string ) )
if err != nil {
glog . Errorf ( "Error syncing job: %v" , err )
}
} ( )
}
}
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
2015-09-17 22:16:04 -04:00
func ( jm * JobController ) syncJob ( key string ) error {
2015-08-27 08:19:35 -04:00
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing job %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-02-12 22:47:33 -05:00
if ! jm . podStoreSynced ( ) {
// Sleep so we give the pod reflector goroutine a chance to run.
time . Sleep ( replicationcontroller . PodStoreSyncedPollPeriod )
glog . V ( 4 ) . Infof ( "Waiting for pods controller to sync, requeuing job %v" , key )
jm . queue . Add ( key )
return nil
}
2015-08-27 08:19:35 -04:00
obj , exists , err := jm . jobStore . Store . GetByKey ( key )
if ! exists {
2015-09-18 15:09:49 -04:00
glog . V ( 4 ) . Infof ( "Job has been deleted: %v" , key )
2015-08-27 08:19:35 -04:00
jm . expectations . DeleteExpectations ( key )
return nil
}
if err != nil {
2015-09-18 15:09:49 -04:00
glog . Errorf ( "Unable to retrieve job %v from store: %v" , key , err )
2015-08-27 08:19:35 -04:00
jm . queue . Add ( key )
return err
}
2015-10-09 18:49:10 -04:00
job := * obj . ( * extensions . Job )
2015-08-27 08:19:35 -04:00
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobKey , err := controller . KeyFunc ( & job )
if err != nil {
glog . Errorf ( "Couldn't get key for job %#v: %v" , job , err )
return err
}
jobNeedsSync := jm . expectations . SatisfiedExpectations ( jobKey )
2016-02-02 00:34:42 -05:00
selector , _ := unversioned . LabelSelectorAsSelector ( job . Spec . Selector )
2015-10-14 14:04:33 -04:00
podList , err := jm . podStore . Pods ( job . Namespace ) . List ( selector )
2015-08-27 08:19:35 -04:00
if err != nil {
glog . Errorf ( "Error getting pods for job %q: %v" , key , err )
jm . queue . Add ( key )
return err
}
activePods := controller . FilterActivePods ( podList . Items )
active := len ( activePods )
2015-10-08 13:33:39 -04:00
succeeded , failed := getStatus ( podList . Items )
2015-11-26 10:54:04 -05:00
conditions := len ( job . Status . Conditions )
if job . Status . StartTime == nil {
now := unversioned . Now ( )
job . Status . StartTime = & now
2015-08-27 08:19:35 -04:00
}
2016-01-12 08:59:14 -05:00
// if job was finished previously, we don't want to redo the termination
if isJobFinished ( & job ) {
return nil
}
2015-11-26 10:54:04 -05:00
if pastActiveDeadline ( & job ) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync . WaitGroup { }
wait . Add ( active )
for i := 0 ; i < active ; i ++ {
go func ( ix int ) {
defer wait . Done ( )
2015-11-27 11:36:39 -05:00
if err := jm . podControl . DeletePod ( job . Namespace , activePods [ ix ] . Name , & job ) ; err != nil {
2016-01-15 02:32:10 -05:00
defer utilruntime . HandleError ( err )
2015-11-26 10:54:04 -05:00
}
} ( i )
}
wait . Wait ( )
// update status values accordingly
failed += active
active = 0
job . Status . Conditions = append ( job . Status . Conditions , newCondition ( extensions . JobFailed , "DeadlineExceeded" , "Job was active longer than specified deadline" ) )
jm . recorder . Event ( & job , api . EventTypeNormal , "DeadlineExceeded" , "Job was active longer than specified deadline" )
} else {
if jobNeedsSync {
active = jm . manageJob ( activePods , succeeded , & job )
}
completions := succeeded
2015-12-14 18:26:16 -05:00
complete := false
if job . Spec . Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
if succeeded > 0 && active == 0 {
complete = true
}
} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
if completions >= * job . Spec . Completions {
complete = true
if active > 0 {
jm . recorder . Event ( & job , api . EventTypeWarning , "TooManyActivePods" , "Too many active pods running after completion count reached" )
}
if completions > * job . Spec . Completions {
jm . recorder . Event ( & job , api . EventTypeWarning , "TooManySucceededPods" , "Too many succeeded pods running after completion count reached" )
}
}
}
if complete {
2015-11-26 10:54:04 -05:00
job . Status . Conditions = append ( job . Status . Conditions , newCondition ( extensions . JobComplete , "" , "" ) )
now := unversioned . Now ( )
job . Status . CompletionTime = & now
}
2015-08-27 08:19:35 -04:00
}
// no need to update the job if the status hasn't changed since last time
2015-11-26 10:54:04 -05:00
if job . Status . Active != active || job . Status . Succeeded != succeeded || job . Status . Failed != failed || len ( job . Status . Conditions ) != conditions {
2015-08-27 08:19:35 -04:00
job . Status . Active = active
2015-10-08 13:33:39 -04:00
job . Status . Succeeded = succeeded
job . Status . Failed = failed
2015-08-27 08:19:35 -04:00
if err := jm . updateHandler ( & job ) ; err != nil {
2015-09-22 00:26:06 -04:00
glog . Errorf ( "Failed to update job %v, requeuing. Error: %v" , job . Name , err )
2015-08-27 08:19:35 -04:00
jm . enqueueController ( & job )
}
}
return nil
}
2015-11-26 10:54:04 -05:00
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func pastActiveDeadline ( job * extensions . Job ) bool {
if job . Spec . ActiveDeadlineSeconds == nil || job . Status . StartTime == nil {
return false
}
now := unversioned . Now ( )
start := job . Status . StartTime . Time
duration := now . Time . Sub ( start )
allowedDuration := time . Duration ( * job . Spec . ActiveDeadlineSeconds ) * time . Second
return duration >= allowedDuration
}
func newCondition ( conditionType extensions . JobConditionType , reason , message string ) extensions . JobCondition {
2015-10-09 18:49:10 -04:00
return extensions . JobCondition {
2015-11-26 10:54:04 -05:00
Type : conditionType ,
2015-08-27 08:19:35 -04:00
Status : api . ConditionTrue ,
2015-09-17 18:21:55 -04:00
LastProbeTime : unversioned . Now ( ) ,
LastTransitionTime : unversioned . Now ( ) ,
2015-11-26 10:54:04 -05:00
Reason : reason ,
Message : message ,
2015-08-27 08:19:35 -04:00
}
}
2015-11-26 10:54:04 -05:00
// getStatus returns no of succeeded and failed pods running a job
2015-10-08 13:33:39 -04:00
func getStatus ( pods [ ] api . Pod ) ( succeeded , failed int ) {
succeeded = filterPods ( pods , api . PodSucceeded )
failed = filterPods ( pods , api . PodFailed )
2015-08-27 08:19:35 -04:00
return
}
2015-11-26 10:54:04 -05:00
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
2015-10-09 18:49:10 -04:00
func ( jm * JobController ) manageJob ( activePods [ ] * api . Pod , succeeded int , job * extensions . Job ) int {
2015-09-18 15:02:59 -04:00
var activeLock sync . Mutex
2015-08-27 08:19:35 -04:00
active := len ( activePods )
parallelism := * job . Spec . Parallelism
jobKey , err := controller . KeyFunc ( job )
if err != nil {
glog . Errorf ( "Couldn't get key for job %#v: %v" , job , err )
return 0
}
if active > parallelism {
diff := active - parallelism
jm . expectations . ExpectDeletions ( jobKey , diff )
2015-09-18 15:09:49 -04:00
glog . V ( 4 ) . Infof ( "Too many pods running job %q, need %d, deleting %d" , jobKey , parallelism , diff )
2015-08-27 08:19:35 -04:00
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort . Sort ( controller . ActivePods ( activePods ) )
active -= diff
wait := sync . WaitGroup { }
wait . Add ( diff )
for i := 0 ; i < diff ; i ++ {
go func ( ix int ) {
defer wait . Done ( )
2015-11-27 11:36:39 -05:00
if err := jm . podControl . DeletePod ( job . Namespace , activePods [ ix ] . Name , job ) ; err != nil {
2016-01-15 02:32:10 -05:00
defer utilruntime . HandleError ( err )
2015-08-27 08:19:35 -04:00
// Decrement the expected number of deletes because the informer won't observe this deletion
jm . expectations . DeletionObserved ( jobKey )
2015-09-18 15:02:59 -04:00
activeLock . Lock ( )
2015-08-27 08:19:35 -04:00
active ++
2015-09-18 15:02:59 -04:00
activeLock . Unlock ( )
2015-08-27 08:19:35 -04:00
}
} ( i )
}
wait . Wait ( )
} else if active < parallelism {
2015-12-14 18:26:16 -05:00
wantActive := 0
if job . Spec . Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = * job . Spec . Completions - succeeded
if wantActive > parallelism {
wantActive = parallelism
}
}
diff := wantActive - active
if diff < 0 {
glog . Errorf ( "More active than wanted: job %q, want %d, have %d" , jobKey , wantActive , active )
diff = 0
2015-08-27 08:19:35 -04:00
}
jm . expectations . ExpectCreations ( jobKey , diff )
2015-12-14 18:26:16 -05:00
glog . V ( 4 ) . Infof ( "Too few pods running job %q, need %d, creating %d" , jobKey , wantActive , diff )
2015-08-27 08:19:35 -04:00
active += diff
wait := sync . WaitGroup { }
wait . Add ( diff )
for i := 0 ; i < diff ; i ++ {
go func ( ) {
defer wait . Done ( )
2015-09-25 15:07:06 -04:00
if err := jm . podControl . CreatePods ( job . Namespace , & job . Spec . Template , job ) ; err != nil {
2016-01-15 02:32:10 -05:00
defer utilruntime . HandleError ( err )
2015-08-27 08:19:35 -04:00
// Decrement the expected number of creates because the informer won't observe this pod
jm . expectations . CreationObserved ( jobKey )
2015-09-18 15:02:59 -04:00
activeLock . Lock ( )
2015-08-27 08:19:35 -04:00
active --
2015-09-18 15:02:59 -04:00
activeLock . Unlock ( )
2015-08-27 08:19:35 -04:00
}
} ( )
}
wait . Wait ( )
}
return active
}
2015-10-09 18:49:10 -04:00
func ( jm * JobController ) updateJobStatus ( job * extensions . Job ) error {
2015-10-12 14:18:50 -04:00
_ , err := jm . kubeClient . Extensions ( ) . Jobs ( job . Namespace ) . UpdateStatus ( job )
2015-08-27 08:19:35 -04:00
return err
}
// filterPods returns pods based on their phase.
func filterPods ( pods [ ] api . Pod , phase api . PodPhase ) int {
result := 0
for i := range pods {
if phase == pods [ i ] . Status . Phase {
result ++
}
}
return result
}
2015-09-18 15:16:38 -04:00
2015-10-09 18:49:10 -04:00
func isJobFinished ( j * extensions . Job ) bool {
2015-10-01 17:35:58 -04:00
for _ , c := range j . Status . Conditions {
2015-11-26 10:54:04 -05:00
if ( c . Type == extensions . JobComplete || c . Type == extensions . JobFailed ) && c . Status == api . ConditionTrue {
2015-10-01 17:35:58 -04:00
return true
}
}
return false
}
2015-09-18 15:16:38 -04:00
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
2015-10-09 18:49:10 -04:00
type byCreationTimestamp [ ] extensions . Job
2015-09-18 15:16:38 -04:00
func ( o byCreationTimestamp ) Len ( ) int { return len ( o ) }
func ( o byCreationTimestamp ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o byCreationTimestamp ) Less ( i , j int ) bool {
if o [ i ] . CreationTimestamp . Equal ( o [ j ] . CreationTimestamp ) {
return o [ i ] . Name < o [ j ] . Name
}
return o [ i ] . CreationTimestamp . Before ( o [ j ] . CreationTimestamp )
}