kubernetes/pkg/controller/replication_controller.go

232 lines
6.9 KiB
Go
Raw Normal View History

2014-06-06 19:40:48 -04:00
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
2014-06-06 19:40:48 -04:00
import (
"encoding/json"
"fmt"
"sync"
2014-06-06 19:40:48 -04:00
"time"
2014-06-12 16:17:34 -04:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
2014-06-06 19:40:48 -04:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
2014-06-22 20:02:48 -04:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
2014-06-06 19:40:48 -04:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
2014-06-06 19:40:48 -04:00
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
2014-06-09 01:38:45 -04:00
// with actual running pods.
2014-06-06 19:40:48 -04:00
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
etcdClient tools.EtcdClient
kubeClient client.Interface
2014-06-09 01:38:45 -04:00
podControl PodControlInterface
2014-06-17 20:56:18 -04:00
syncTime <-chan time.Time
// To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error
2014-06-06 19:40:48 -04:00
}
2014-07-10 07:47:10 -04:00
// PodControlInterface is an interface that knows how to add or delete pods
2014-06-06 19:40:48 -04:00
// created as an interface to allow testing.
2014-06-09 01:38:45 -04:00
type PodControlInterface interface {
2014-07-10 07:47:10 -04:00
// createReplica creates new replicated pods according to the spec.
2014-06-12 16:17:34 -04:00
createReplica(controllerSpec api.ReplicationController)
2014-07-10 07:47:10 -04:00
// deletePod deletes the pod identified by podID.
2014-06-09 01:38:45 -04:00
deletePod(podID string) error
2014-06-06 19:40:48 -04:00
}
2014-07-10 07:47:10 -04:00
// RealPodControl is the default implementation of PodControllerInterface.
2014-06-09 01:38:45 -04:00
type RealPodControl struct {
kubeClient client.Interface
2014-06-06 19:40:48 -04:00
}
2014-06-12 16:17:34 -04:00
func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) {
2014-06-09 00:39:57 -04:00
labels := controllerSpec.DesiredState.PodTemplate.Labels
2014-06-06 19:40:48 -04:00
if labels != nil {
labels["replicationController"] = controllerSpec.ID
}
2014-06-12 16:17:34 -04:00
pod := api.Pod{
2014-06-09 00:39:57 -04:00
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
Labels: controllerSpec.DesiredState.PodTemplate.Labels,
2014-06-06 19:40:48 -04:00
}
2014-06-09 01:38:45 -04:00
_, err := r.kubeClient.CreatePod(pod)
2014-06-06 19:40:48 -04:00
if err != nil {
glog.Errorf("%#v\n", err)
2014-06-06 19:40:48 -04:00
}
}
2014-06-09 01:38:45 -04:00
func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID)
2014-06-06 19:40:48 -04:00
}
2014-07-10 07:47:10 -04:00
// MakeReplicationManager craetes a new ReplicationManager.
func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Interface) *ReplicationManager {
rm := &ReplicationManager{
2014-06-06 19:40:48 -04:00
kubeClient: kubeClient,
etcdClient: etcdClient,
2014-06-09 01:38:45 -04:00
podControl: RealPodControl{
2014-06-06 19:40:48 -04:00
kubeClient: kubeClient,
},
}
rm.syncHandler = func(controllerSpec api.ReplicationController) error {
return rm.syncReplicationController(controllerSpec)
}
return rm
2014-06-06 19:40:48 -04:00
}
2014-07-10 07:47:10 -04:00
// Run begins watching and syncing.
2014-06-17 19:42:29 -04:00
func (rm *ReplicationManager) Run(period time.Duration) {
2014-06-17 20:56:18 -04:00
rm.syncTime = time.Tick(period)
2014-06-17 19:42:29 -04:00
go util.Forever(func() { rm.watchControllers() }, period)
}
func (rm *ReplicationManager) watchControllers() {
2014-06-06 19:40:48 -04:00
watchChannel := make(chan *etcd.Response)
stop := make(chan bool)
// Ensure that the call to watch ends.
defer close(stop)
2014-06-13 21:11:32 -04:00
go func() {
defer util.HandleCrash()
_, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop)
if err == etcd.ErrWatchStoppedByUser {
close(watchChannel)
} else {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
2014-06-13 21:11:32 -04:00
}()
2014-06-06 19:40:48 -04:00
for {
2014-06-17 20:56:18 -04:00
select {
case <-rm.syncTime:
rm.synchronize()
case watchResponse, open := <-watchChannel:
if !open || watchResponse == nil {
// watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever()
// that called us call us again.
2014-06-17 20:56:18 -04:00
return
}
glog.Infof("Got watch: %#v", watchResponse)
2014-06-17 20:56:18 -04:00
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
glog.Errorf("Error handling data: %#v, %#v", err, watchResponse)
2014-06-17 20:56:18 -04:00
continue
}
rm.syncHandler(*controller)
2014-06-06 19:40:48 -04:00
}
}
}
2014-06-12 16:17:34 -04:00
func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api.ReplicationController, error) {
switch response.Action {
case "set":
if response.Node == nil {
return nil, fmt.Errorf("response node is null %#v", response)
2014-06-06 19:40:48 -04:00
}
var controllerSpec api.ReplicationController
if err := json.Unmarshal([]byte(response.Node.Value), &controllerSpec); err != nil {
return nil, err
}
return &controllerSpec, nil
case "delete":
// Ensure that the final state of a replication controller is applied before it is deleted.
// Otherwise, a replication controller could be modified and then deleted (for example, from 3 to 0
// replicas), and it would be non-deterministic which of its pods continued to exist.
if response.PrevNode == nil {
return nil, fmt.Errorf("previous node is null %#v", response)
}
var controllerSpec api.ReplicationController
if err := json.Unmarshal([]byte(response.PrevNode.Value), &controllerSpec); err != nil {
return nil, err
}
return &controllerSpec, nil
2014-06-06 19:40:48 -04:00
}
return nil, nil
}
2014-06-12 16:17:34 -04:00
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
2014-06-09 01:38:45 -04:00
for _, value := range pods {
if api.PodStopped != value.CurrentState.Status {
2014-06-06 19:40:48 -04:00
result = append(result, value)
}
}
return result
}
2014-06-12 16:17:34 -04:00
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
2014-06-22 20:02:48 -04:00
s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector()
podList, err := rm.kubeClient.ListPods(s)
2014-06-06 19:40:48 -04:00
if err != nil {
return err
}
2014-06-09 01:38:45 -04:00
filteredList := rm.filterActivePods(podList.Items)
2014-06-06 19:40:48 -04:00
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
if diff < 0 {
diff *= -1
wait := sync.WaitGroup{}
wait.Add(diff)
glog.Infof("Too few replicas, creating %d\n", diff)
2014-06-06 19:40:48 -04:00
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
rm.podControl.createReplica(controllerSpec)
}()
2014-06-06 19:40:48 -04:00
}
wait.Wait()
2014-06-06 19:40:48 -04:00
} else if diff > 0 {
glog.Infof("Too many replicas, deleting %d\n", diff)
wait := sync.WaitGroup{}
wait.Add(diff)
2014-06-06 19:40:48 -04:00
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(filteredList[ix].ID)
}(i)
2014-06-06 19:40:48 -04:00
}
wait.Wait()
2014-06-06 19:40:48 -04:00
}
return nil
}
2014-06-17 19:42:29 -04:00
func (rm *ReplicationManager) synchronize() {
2014-06-17 20:56:18 -04:00
var controllerSpecs []api.ReplicationController
helper := tools.EtcdHelper{rm.etcdClient}
2014-06-17 20:56:18 -04:00
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
2014-06-17 19:42:29 -04:00
if err != nil {
glog.Errorf("Synchronization error: %v (%#v)", err, err)
return
2014-06-17 19:42:29 -04:00
}
wg := sync.WaitGroup{}
wg.Add(len(controllerSpecs))
for ix := range controllerSpecs {
go func(ix int) {
defer wg.Done()
err := rm.syncHandler(controllerSpecs[ix])
if err != nil {
glog.Errorf("Error synchronizing: %#v", err)
}
}(ix)
2014-06-06 19:40:48 -04:00
}
wg.Wait()
2014-06-06 19:40:48 -04:00
}