Skip to content

Commit

Permalink
Merge pull request #199 from gluke77/failure-handling-timeout
Browse files Browse the repository at this point in the history
Failure handling: timeout
  • Loading branch information
pigmej authored Feb 27, 2017
2 parents 721f649 + 4ca6ef2 commit 4114c9b
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 15 deletions.
59 changes: 59 additions & 0 deletions e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package integration

import (
"fmt"
"time"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -125,6 +126,25 @@ var _ = Describe("Basic Suite", func() {
})
})
})

Describe("Failure handling - timeout", func() {
var parentPod *v1.Pod
var childPod *v1.Pod

BeforeEach(func() {
parentPod = DelayedPod("parent-pod", 15)
childPod = PodPause("child-pod")
})

Context("If parent timed out", func() {
It("on-error dependency must be followed", func() {
parentResDef := framework.WrapWithMetaAndCreate(parentPod, map[string]interface{}{"timeout": 5})
framework.ConnectWithMeta(parentResDef, framework.WrapAndCreate(childPod), map[string]string{"on-error": "true"})
framework.Run()
testutils.WaitForPod(framework.Clientset, framework.Namespace.Name, childPod.Name, v1.PodRunning)
})
})
})
})

func getKind(resdef *client.ResourceDefinition) string {
Expand Down Expand Up @@ -155,13 +175,26 @@ func (g GraphFramework) Wrap(obj runtime.Object) *client.ResourceDefinition {
return resdef
}

func (g GraphFramework) WrapWithMeta(obj runtime.Object, meta map[string]interface{}) *client.ResourceDefinition {
resdef := g.Wrap(obj)
resdef.Meta = meta
return resdef
}

func (g GraphFramework) WrapAndCreate(obj runtime.Object) *client.ResourceDefinition {
resdef := g.Wrap(obj)
_, err := g.Client.ResourceDefinitions().Create(resdef)
Expect(err).NotTo(HaveOccurred())
return resdef
}

func (g GraphFramework) WrapWithMetaAndCreate(obj runtime.Object, meta map[string]interface{}) *client.ResourceDefinition {
resdef := g.WrapWithMeta(obj, meta)
_, err := g.Client.ResourceDefinitions().Create(resdef)
Expect(err).NotTo(HaveOccurred())
return resdef
}

func (g GraphFramework) ConnectWithMeta(first, second *client.ResourceDefinition, meta map[string]string) {
dep := &client.Dependency{
ObjectMeta: api.ObjectMeta{
Expand Down Expand Up @@ -197,3 +230,29 @@ func PodPause(name string) *v1.Pod {
},
}
}

func DelayedPod(name string, delay int) *v1.Pod {
cmdArg := fmt.Sprintf("sleep %d; echo ok > /tmp/health; sleep 60", delay)
return &v1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "sleeper",
Image: "gcr.io/google_containers/busybox",
Command: []string{"/bin/sh"},
Args: []string{"-c", cmdArg},
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{
Command: []string{"/bin/cat", "/tmp/health"},
},
},
},
},
},
},
}
}
15 changes: 15 additions & 0 deletions examples/timeout/create.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

source ../common.sh

$KUBECTL_NAME create -f ../../manifests/appcontroller.yaml
wait-appcontroller

$KUBECTL_NAME create -f deps.yaml

cat pod.yaml | $KUBECTL_NAME create -f -
cat pod2.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_NAME create -f -
cat timedout-pod.yaml | $KUBECTL_NAME create -f -

$KUBECTL_NAME exec k8s-appcontroller ac-run
$KUBECTL_NAME logs -f k8s-appcontroller
11 changes: 11 additions & 0 deletions examples/timeout/delete.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

source ../common.sh

$KUBECTL_NAME delete -f deps.yaml

cat pod.yaml | $KUBECTL_NAME delete -f -
cat pod2.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_NAME delete -f -
cat timedout-pod.yaml | $KUBECTL_NAME delete -f -

$KUBECTL_NAME delete -f ../../manifests/appcontroller.yaml
8 changes: 8 additions & 0 deletions examples/timeout/deps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: ce1b11dc-2850-1dad-a7dd-302038af20af
meta:
on-error: "true"
parent: pod/timed-out-pod
child: pod/eventually-alive-pod2
24 changes: 24 additions & 0 deletions examples/timeout/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: appcontroller.k8s/v1alpha1
kind: Definition
metadata:
name: pod-eventually-alive-pod
meta:
timeout: 60
pod:
apiVersion: v1
kind: Pod
metadata:
name: eventually-alive-pod
spec:
containers:
- command: ["/bin/sh"]
args:
- -c
- sleep 30; echo ok > /tmp/health; sleep 60
image: gcr.io/google_containers/busybox
readinessProbe:
exec:
command:
- /bin/cat
- /tmp/health
name: test-container
13 changes: 13 additions & 0 deletions examples/timeout/pod2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Pod
metadata:
name: eventually-alive-pod2
spec:
containers:
- command: ["/bin/sh"]
args:
- -c
- echo ok > /tmp/health
image: gcr.io/google_containers/busybox
name: test-container
restartPolicy: Never
24 changes: 24 additions & 0 deletions examples/timeout/timedout-pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: appcontroller.k8s/v1alpha1
kind: Definition
metadata:
name: pod-timed-out-pod
meta:
timeout: 5
pod:
apiVersion: v1
kind: Pod
metadata:
name: timed-out-pod
spec:
containers:
- command: ["/bin/sh"]
args:
- -c
- sleep 30; echo ok > /tmp/health; sleep 60
image: gcr.io/google_containers/busybox
readinessProbe:
exec:
command:
- /bin/cat
- /tmp/health
name: test-container
1 change: 0 additions & 1 deletion pkg/resources/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (p Pod) NameMatches(def client.ResourceDefinition, name string) bool {

// New returns new Pod based on resource definition
func (p Pod) New(def client.ResourceDefinition, c client.Interface) interfaces.Resource {
log.Println("Creating a pod, meta", def.Meta)
return NewPod(def.Pod, c.Pods(), def.Meta)
}

Expand Down
48 changes: 34 additions & 14 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s DeploymentStatus) String() string {
// CheckInterval is an interval between rechecking the tree for updates
const (
CheckInterval = time.Millisecond * 1000
WaitTimeout = time.Second * 600
)

// ScheduledResource is a wrapper for Resource with attached relationship data
Expand Down Expand Up @@ -107,35 +108,48 @@ func (sr *ScheduledResource) RequestCreation(toCreate chan *ScheduledResource) b
toCreate <- sr
return true
}

return false
}

// Wait periodically checks resource status and returns if the resource processing is finished,
// regardless successfull or not. The actual result of processing could be obtained from returned error.
func (sr *ScheduledResource) Wait(checkInterval time.Duration) error {
for {
status, err := sr.Status(nil)
if err != nil {
return err
}
func (sr *ScheduledResource) Wait(checkInterval time.Duration, timeout time.Duration) error {
ch := make(chan error, 1)
go func(ch chan error) {
for {
status, err := sr.Status(nil)
if err != nil {
ch <- err
}

if status == "ready" {
return nil
}
if status == "ready" {
ch <- nil
}

time.Sleep(checkInterval)
time.Sleep(checkInterval)
}
}(ch)

select {
case err := <-ch:
return err
case <-time.After(timeout):
e := fmt.Errorf("timeout waiting for resource %s", sr.Key())
sr.Lock()
defer sr.Unlock()
sr.Error = e
return e
}
}

// Status either returns cached copy of resource's status or retrieves it via Resource.Status
// depending on presense of cached copy and resource's settings
func (sr *ScheduledResource) Status(meta map[string]string) (string, error) {
sr.Lock()
defer sr.Unlock()
if (sr.status == "ready" || sr.Error != nil) && sr.Resource.StatusIsCacheable(meta) {
return sr.status, sr.Error
}
sr.Lock()
defer sr.Unlock()
status, err := sr.Resource.Status(meta)
sr.Error = err
if sr.Resource.StatusIsCacheable(meta) {
Expand Down Expand Up @@ -329,6 +343,12 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL
ccLimiter <- struct{}{}

attempts := resources.GetIntMeta(r.Resource, "retry", 1)
timeoutInSeconds := resources.GetIntMeta(r.Resource, "timeout", -1)

waitTimeout := WaitTimeout
if timeoutInSeconds > 0 {
waitTimeout = time.Second * time.Duration(timeoutInSeconds)
}

for attemptNo := 1; attemptNo <= attempts; attemptNo++ {

Expand Down Expand Up @@ -370,7 +390,7 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL

log.Printf("Checking status for %s", r.Key())

err = r.Wait(CheckInterval)
err = r.Wait(CheckInterval, waitTimeout)

if err == nil {
log.Printf("Resource %s created", r.Key())
Expand Down

0 comments on commit 4114c9b

Please sign in to comment.