Merge pull request #178 from totherme/control-pane-process-timeouts

Control pane process timeouts
This commit is contained in:
k8s-ci-robot 2017-12-14 20:40:29 -08:00 committed by GitHub
commit 129504ba58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 80 deletions

View file

@ -17,6 +17,8 @@ type APIServer struct {
ProcessStarter simpleSessionStarter
CertDirManager certDirManager
Etcd ControlPlaneProcess
StopTimeout time.Duration
StartTimeout time.Duration
session SimpleSession
stdOut *gbytes.Buffer
stdErr *gbytes.Buffer
@ -87,7 +89,7 @@ func (s *APIServer) Start() error {
}
detectedStart := s.stdErr.Detect(fmt.Sprintf("Serving insecurely on %s:%d", addr, port))
timedOut := time.After(20 * time.Second)
timedOut := time.After(s.StartTimeout)
command := exec.Command(s.PathFinder("kube-apiserver"), args...)
s.session, err = s.ProcessStarter(command, s.stdOut, s.stdErr)
@ -121,6 +123,12 @@ func (s *APIServer) ensureInitialized() {
if s.Etcd == nil {
s.Etcd = &Etcd{}
}
if s.StopTimeout == 0 {
s.StopTimeout = 20 * time.Second
}
if s.StartTimeout == 0 {
s.StartTimeout = 20 * time.Second
}
s.stdOut = gbytes.NewBuffer()
s.stdErr = gbytes.NewBuffer()
@ -132,18 +140,22 @@ func (s *APIServer) Stop() error {
return nil
}
s.session.Terminate()
// TODO have a better way to handle the timeout of Stop()
s.session.Wait(20 * time.Second)
session := s.session.Terminate()
detectedStop := session.Exited
timedOut := time.After(s.StopTimeout)
err := s.Etcd.Stop()
if err != nil {
select {
case <-detectedStop:
break
case <-timedOut:
return fmt.Errorf("timeout waiting for apiserver to stop")
}
if err := s.Etcd.Stop(); err != nil {
return err
}
err = s.CertDirManager.Destroy()
return err
return s.CertDirManager.Destroy()
}
// ExitCode returns the exit code of the process, if it has exited. If it hasn't exited yet, ExitCode returns -1.

View file

@ -8,6 +8,8 @@ import (
"fmt"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
@ -23,6 +25,7 @@ var _ = Describe("Apiserver", func() {
fakeEtcdProcess *testfakes.FakeControlPlaneProcess
fakePathFinder *testfakes.FakeBinPathFinder
fakeAddressManager *testfakes.FakeAddressManager
apiServerStopper chan struct{}
)
BeforeEach(func() {
@ -32,11 +35,18 @@ var _ = Describe("Apiserver", func() {
fakePathFinder = &testfakes.FakeBinPathFinder{}
fakeAddressManager = &testfakes.FakeAddressManager{}
apiServerStopper = make(chan struct{}, 1)
fakeSession.TerminateReturns(&gexec.Session{
Exited: apiServerStopper,
})
close(apiServerStopper)
apiServer = &APIServer{
AddressManager: fakeAddressManager,
PathFinder: fakePathFinder.Spy,
CertDirManager: fakeCertDirManager,
Etcd: fakeEtcdProcess,
StopTimeout: 500 * time.Millisecond,
}
})
@ -93,13 +103,12 @@ var _ = Describe("Apiserver", func() {
Expect(fakeCertDirManager.CreateCallCount()).To(Equal(1))
By("Stopping the API Server")
apiServer.Stop()
Expect(apiServer.Stop()).To(Succeed())
Expect(fakeCertDirManager.DestroyCallCount()).To(Equal(1))
Expect(fakeEtcdProcess.StopCallCount()).To(Equal(1))
Expect(apiServer).To(gexec.Exit(143))
Expect(fakeSession.TerminateCallCount()).To(Equal(1))
Expect(fakeSession.WaitCallCount()).To(Equal(1))
Expect(fakeSession.ExitCodeCallCount()).To(Equal(2))
Expect(fakeCertDirManager.DestroyCallCount()).To(Equal(1))
})
@ -225,6 +234,18 @@ var _ = Describe("Apiserver", func() {
})
})
Context("when the starter takes longer than our timeout", func() {
It("gives us a timeout error", func() {
apiServer.StartTimeout = 1 * time.Nanosecond
apiServer.ProcessStarter = func(command *exec.Cmd, out, err io.Writer) (SimpleSession, error) {
return &gexec.Session{}, nil
}
err := apiServer.Start()
Expect(err).To(MatchError(ContainSubstring("timeout waiting for apiserver to start serving")))
})
})
Context("when we try to stop a server that hasn't been started", func() {
It("is a noop and does not call exit on the session", func() {
apiServer.ProcessStarter = func(command *exec.Cmd, out, err io.Writer) (SimpleSession, error) {
@ -235,6 +256,25 @@ var _ = Describe("Apiserver", func() {
})
})
Context("when Stop() times out", func() {
JustBeforeEach(func() {
apiServerStopperWillNeverBeUsed := make(chan struct{}, 1)
fakeSession.TerminateReturns(&gexec.Session{
Exited: apiServerStopperWillNeverBeUsed,
})
})
It("propagates the error", func() {
fakeAddressManager.InitializeReturns(1234, "this.is.apiserver", nil)
apiServer.ProcessStarter = func(Command *exec.Cmd, out, err io.Writer) (SimpleSession, error) {
fmt.Fprint(err, "Serving insecurely on this.is.apiserver:1234")
return fakeSession, nil
}
Expect(apiServer.Start()).To(Succeed())
err := apiServer.Stop()
Expect(err).To(MatchError(ContainSubstring("timeout")))
})
})
})
Describe("querying the server for its URL", func() {

View file

@ -16,6 +16,8 @@ type Etcd struct {
PathFinder BinPathFinder
ProcessStarter simpleSessionStarter
DataDirManager dataDirManager
StopTimeout time.Duration
StartTimeout time.Duration
session SimpleSession
stdOut *gbytes.Buffer
stdErr *gbytes.Buffer
@ -32,7 +34,6 @@ type dataDirManager interface {
type SimpleSession interface {
Buffer() *gbytes.Buffer
ExitCode() int
Wait(timeout ...interface{}) *gexec.Session
Terminate() *gexec.Session
}
@ -81,7 +82,7 @@ func (e *Etcd) Start() error {
detectedStart := e.stdErr.Detect(fmt.Sprintf(
"serving insecure client requests on %s", host))
timedOut := time.After(20 * time.Second)
timedOut := time.After(e.StartTimeout)
command := exec.Command(e.PathFinder("etcd"), args...)
e.session, err = e.ProcessStarter(command, e.stdOut, e.stdErr)
@ -113,6 +114,12 @@ func (e *Etcd) ensureInitialized() {
if e.DataDirManager == nil {
e.DataDirManager = NewTempDirManager()
}
if e.StopTimeout == 0 {
e.StopTimeout = 20 * time.Second
}
if e.StartTimeout == 0 {
e.StartTimeout = 20 * time.Second
}
e.stdOut = gbytes.NewBuffer()
e.stdErr = gbytes.NewBuffer()
@ -124,13 +131,18 @@ func (e *Etcd) Stop() error {
return nil
}
e.session.Terminate()
// TODO have a better way to handle the timeout of Stop()
e.session.Wait(20 * time.Second)
session := e.session.Terminate()
detectedStop := session.Exited
timedOut := time.After(e.StopTimeout)
err := e.DataDirManager.Destroy()
select {
case <-detectedStop:
break
case <-timedOut:
return fmt.Errorf("timeout waiting for etcd to stop")
}
return err
return e.DataDirManager.Destroy()
}
// ExitCode returns the exit code of the process, if it has exited. If it hasn't exited yet, ExitCode returns -1.

View file

@ -5,6 +5,8 @@ import (
"io"
"os/exec"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
@ -20,6 +22,7 @@ var _ = Describe("Etcd", func() {
fakePathFinder *testfakes.FakeBinPathFinder
fakeAddressManager *testfakes.FakeAddressManager
etcd *Etcd
etcdStopper chan struct{}
)
BeforeEach(func() {
@ -28,10 +31,17 @@ var _ = Describe("Etcd", func() {
fakePathFinder = &testfakes.FakeBinPathFinder{}
fakeAddressManager = &testfakes.FakeAddressManager{}
etcdStopper = make(chan struct{}, 1)
fakeSession.TerminateReturns(&gexec.Session{
Exited: etcdStopper,
})
close(etcdStopper)
etcd = &Etcd{
AddressManager: fakeAddressManager,
PathFinder: fakePathFinder.Spy,
DataDirManager: fakeDataDirManager,
StopTimeout: 500 * time.Millisecond,
}
})
@ -83,7 +93,6 @@ var _ = Describe("Etcd", func() {
Expect(fakeDataDirManager.DestroyCallCount()).To(Equal(1))
Expect(etcd).To(gexec.Exit(143))
Expect(fakeSession.TerminateCallCount()).To(Equal(1))
Expect(fakeSession.WaitCallCount()).To(Equal(1))
Expect(fakeSession.ExitCodeCallCount()).To(Equal(2))
Expect(fakeDataDirManager.DestroyCallCount()).To(Equal(1))
})
@ -144,6 +153,18 @@ var _ = Describe("Etcd", func() {
})
})
Context("when the starter takes longer than our timeout", func() {
It("gives us a timeout error", func() {
etcd.StartTimeout = 1 * time.Nanosecond
etcd.ProcessStarter = func(command *exec.Cmd, out, err io.Writer) (SimpleSession, error) {
return &gexec.Session{}, nil
}
err := etcd.Start()
Expect(err).To(MatchError(ContainSubstring("timeout waiting for etcd to start serving")))
})
})
Context("when we try to stop a server that hasn't been started", func() {
It("is a noop and does not call exit on the session", func() {
etcd.ProcessStarter = func(command *exec.Cmd, out, err io.Writer) (SimpleSession, error) {
@ -153,6 +174,27 @@ var _ = Describe("Etcd", func() {
Expect(fakeSession.ExitCodeCallCount()).To(Equal(0))
})
})
Context("when Stop() times out", func() {
JustBeforeEach(func() {
etcdStopperWillNotBeUsed := make(chan struct{})
fakeSession.TerminateReturns(&gexec.Session{
Exited: etcdStopperWillNotBeUsed,
})
})
It("propagates the error", func() {
fakeAddressManager.InitializeReturns(1234, "this.is.etcd", nil)
etcd.ProcessStarter = func(command *exec.Cmd, out, err io.Writer) (SimpleSession, error) {
fmt.Fprint(err, "serving insecure client requests on this.is.etcd:1234")
return fakeSession, nil
}
Expect(etcd.Start()).To(Succeed())
err := etcd.Stop()
Expect(err).To(MatchError(ContainSubstring("timeout")))
})
})
})
Describe("querying the server for its URL", func() {

View file

@ -28,17 +28,6 @@ type FakeSimpleSession struct {
exitCodeReturnsOnCall map[int]struct {
result1 int
}
WaitStub func(timeout ...interface{}) *gexec.Session
waitMutex sync.RWMutex
waitArgsForCall []struct {
timeout []interface{}
}
waitReturns struct {
result1 *gexec.Session
}
waitReturnsOnCall map[int]struct {
result1 *gexec.Session
}
TerminateStub func() *gexec.Session
terminateMutex sync.RWMutex
terminateArgsForCall []struct{}
@ -132,54 +121,6 @@ func (fake *FakeSimpleSession) ExitCodeReturnsOnCall(i int, result1 int) {
}{result1}
}
func (fake *FakeSimpleSession) Wait(timeout ...interface{}) *gexec.Session {
fake.waitMutex.Lock()
ret, specificReturn := fake.waitReturnsOnCall[len(fake.waitArgsForCall)]
fake.waitArgsForCall = append(fake.waitArgsForCall, struct {
timeout []interface{}
}{timeout})
fake.recordInvocation("Wait", []interface{}{timeout})
fake.waitMutex.Unlock()
if fake.WaitStub != nil {
return fake.WaitStub(timeout...)
}
if specificReturn {
return ret.result1
}
return fake.waitReturns.result1
}
func (fake *FakeSimpleSession) WaitCallCount() int {
fake.waitMutex.RLock()
defer fake.waitMutex.RUnlock()
return len(fake.waitArgsForCall)
}
func (fake *FakeSimpleSession) WaitArgsForCall(i int) []interface{} {
fake.waitMutex.RLock()
defer fake.waitMutex.RUnlock()
return fake.waitArgsForCall[i].timeout
}
func (fake *FakeSimpleSession) WaitReturns(result1 *gexec.Session) {
fake.WaitStub = nil
fake.waitReturns = struct {
result1 *gexec.Session
}{result1}
}
func (fake *FakeSimpleSession) WaitReturnsOnCall(i int, result1 *gexec.Session) {
fake.WaitStub = nil
if fake.waitReturnsOnCall == nil {
fake.waitReturnsOnCall = make(map[int]struct {
result1 *gexec.Session
})
}
fake.waitReturnsOnCall[i] = struct {
result1 *gexec.Session
}{result1}
}
func (fake *FakeSimpleSession) Terminate() *gexec.Session {
fake.terminateMutex.Lock()
ret, specificReturn := fake.terminateReturnsOnCall[len(fake.terminateArgsForCall)]
@ -227,8 +168,6 @@ func (fake *FakeSimpleSession) Invocations() map[string][][]interface{} {
defer fake.bufferMutex.RUnlock()
fake.exitCodeMutex.RLock()
defer fake.exitCodeMutex.RUnlock()
fake.waitMutex.RLock()
defer fake.waitMutex.RUnlock()
fake.terminateMutex.RLock()
defer fake.terminateMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}