diff --git a/e2e/basic_test.go b/e2e/basic_test.go index 0ca1f83..816bf32 100644 --- a/e2e/basic_test.go +++ b/e2e/basic_test.go @@ -15,6 +15,7 @@ package integration import ( + "fmt" "time" . "github.com/onsi/ginkgo" @@ -125,6 +126,25 @@ var _ = Describe("Basic Suite", func() { }) }) }) + + Describe("Failure handling - timeout", func() { + var parentPod *v1.Pod + var childPod *v1.Pod + + BeforeEach(func() { + parentPod = DelayedPod("parent-pod", 15) + childPod = PodPause("child-pod") + }) + + Context("If parent timed out", func() { + It("on-error dependency must be followed", func() { + parentResDef := framework.WrapWithMetaAndCreate(parentPod, map[string]interface{}{"timeout": 5}) + framework.ConnectWithMeta(parentResDef, framework.WrapAndCreate(childPod), map[string]string{"on-error": "true"}) + framework.Run() + testutils.WaitForPod(framework.Clientset, framework.Namespace.Name, childPod.Name, v1.PodRunning) + }) + }) + }) }) func getKind(resdef *client.ResourceDefinition) string { @@ -155,6 +175,12 @@ func (g GraphFramework) Wrap(obj runtime.Object) *client.ResourceDefinition { return resdef } +func (g GraphFramework) WrapWithMeta(obj runtime.Object, meta map[string]interface{}) *client.ResourceDefinition { + resdef := g.Wrap(obj) + resdef.Meta = meta + return resdef +} + func (g GraphFramework) WrapAndCreate(obj runtime.Object) *client.ResourceDefinition { resdef := g.Wrap(obj) _, err := g.Client.ResourceDefinitions().Create(resdef) @@ -162,6 +188,13 @@ func (g GraphFramework) WrapAndCreate(obj runtime.Object) *client.ResourceDefini return resdef } +func (g GraphFramework) WrapWithMetaAndCreate(obj runtime.Object, meta map[string]interface{}) *client.ResourceDefinition { + resdef := g.WrapWithMeta(obj, meta) + _, err := g.Client.ResourceDefinitions().Create(resdef) + Expect(err).NotTo(HaveOccurred()) + return resdef +} + func (g GraphFramework) ConnectWithMeta(first, second *client.ResourceDefinition, meta map[string]string) { dep := &client.Dependency{ ObjectMeta: api.ObjectMeta{ @@ -197,3 +230,29 @@ func PodPause(name string) *v1.Pod { }, } } + +func DelayedPod(name string, delay int) *v1.Pod { + cmdArg := fmt.Sprintf("sleep %d; echo ok > /tmp/health; sleep 60", delay) + return &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "sleeper", + Image: "gcr.io/google_containers/busybox", + Command: []string{"/bin/sh"}, + Args: []string{"-c", cmdArg}, + ReadinessProbe: &v1.Probe{ + Handler: v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/cat", "/tmp/health"}, + }, + }, + }, + }, + }, + }, + } +} diff --git a/examples/timeout/create.sh b/examples/timeout/create.sh new file mode 100755 index 0000000..d6d98e0 --- /dev/null +++ b/examples/timeout/create.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +source ../common.sh + +$KUBECTL_NAME create -f ../../manifests/appcontroller.yaml +wait-appcontroller + +$KUBECTL_NAME create -f deps.yaml + +cat pod.yaml | $KUBECTL_NAME create -f - +cat pod2.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_NAME create -f - +cat timedout-pod.yaml | $KUBECTL_NAME create -f - + +$KUBECTL_NAME exec k8s-appcontroller ac-run +$KUBECTL_NAME logs -f k8s-appcontroller diff --git a/examples/timeout/delete.sh b/examples/timeout/delete.sh new file mode 100755 index 0000000..d773187 --- /dev/null +++ b/examples/timeout/delete.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +source ../common.sh + +$KUBECTL_NAME delete -f deps.yaml + +cat pod.yaml | $KUBECTL_NAME delete -f - +cat pod2.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_NAME delete -f - +cat timedout-pod.yaml | $KUBECTL_NAME delete -f - + +$KUBECTL_NAME delete -f ../../manifests/appcontroller.yaml diff --git a/examples/timeout/deps.yaml b/examples/timeout/deps.yaml new file mode 100644 index 0000000..c597661 --- /dev/null +++ b/examples/timeout/deps.yaml @@ -0,0 +1,8 @@ +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: ce1b11dc-2850-1dad-a7dd-302038af20af +meta: + on-error: "true" +parent: pod/timed-out-pod +child: pod/eventually-alive-pod2 diff --git a/examples/timeout/pod.yaml b/examples/timeout/pod.yaml new file mode 100644 index 0000000..d8bb4ce --- /dev/null +++ b/examples/timeout/pod.yaml @@ -0,0 +1,24 @@ +apiVersion: appcontroller.k8s/v1alpha1 +kind: Definition +metadata: + name: pod-eventually-alive-pod +meta: + timeout: 60 +pod: + apiVersion: v1 + kind: Pod + metadata: + name: eventually-alive-pod + spec: + containers: + - command: ["/bin/sh"] + args: + - -c + - sleep 30; echo ok > /tmp/health; sleep 60 + image: gcr.io/google_containers/busybox + readinessProbe: + exec: + command: + - /bin/cat + - /tmp/health + name: test-container diff --git a/examples/timeout/pod2.yaml b/examples/timeout/pod2.yaml new file mode 100644 index 0000000..6c6beaa --- /dev/null +++ b/examples/timeout/pod2.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Pod +metadata: + name: eventually-alive-pod2 +spec: + containers: + - command: ["/bin/sh"] + args: + - -c + - echo ok > /tmp/health + image: gcr.io/google_containers/busybox + name: test-container + restartPolicy: Never diff --git a/examples/timeout/timedout-pod.yaml b/examples/timeout/timedout-pod.yaml new file mode 100644 index 0000000..f181be6 --- /dev/null +++ b/examples/timeout/timedout-pod.yaml @@ -0,0 +1,24 @@ +apiVersion: appcontroller.k8s/v1alpha1 +kind: Definition +metadata: + name: pod-timed-out-pod +meta: + timeout: 5 +pod: + apiVersion: v1 + kind: Pod + metadata: + name: timed-out-pod + spec: + containers: + - command: ["/bin/sh"] + args: + - -c + - sleep 30; echo ok > /tmp/health; sleep 60 + image: gcr.io/google_containers/busybox + readinessProbe: + exec: + command: + - /bin/cat + - /tmp/health + name: test-container diff --git a/pkg/resources/pod.go b/pkg/resources/pod.go index d632da0..fe84cf6 100644 --- a/pkg/resources/pod.go +++ b/pkg/resources/pod.go @@ -92,7 +92,6 @@ func (p Pod) NameMatches(def client.ResourceDefinition, name string) bool { // New returns new Pod based on resource definition func (p Pod) New(def client.ResourceDefinition, c client.Interface) interfaces.Resource { - log.Println("Creating a pod, meta", def.Meta) return NewPod(def.Pod, c.Pods(), def.Meta) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ea5f1ed..62f5110 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -72,6 +72,7 @@ func (s DeploymentStatus) String() string { // CheckInterval is an interval between rechecking the tree for updates const ( CheckInterval = time.Millisecond * 1000 + WaitTimeout = time.Second * 600 ) // ScheduledResource is a wrapper for Resource with attached relationship data @@ -107,35 +108,48 @@ func (sr *ScheduledResource) RequestCreation(toCreate chan *ScheduledResource) b toCreate <- sr return true } - return false } // Wait periodically checks resource status and returns if the resource processing is finished, // regardless successfull or not. The actual result of processing could be obtained from returned error. -func (sr *ScheduledResource) Wait(checkInterval time.Duration) error { - for { - status, err := sr.Status(nil) - if err != nil { - return err - } +func (sr *ScheduledResource) Wait(checkInterval time.Duration, timeout time.Duration) error { + ch := make(chan error, 1) + go func(ch chan error) { + for { + status, err := sr.Status(nil) + if err != nil { + ch <- err + } - if status == "ready" { - return nil - } + if status == "ready" { + ch <- nil + } - time.Sleep(checkInterval) + time.Sleep(checkInterval) + } + }(ch) + + select { + case err := <-ch: + return err + case <-time.After(timeout): + e := fmt.Errorf("timeout waiting for resource %s", sr.Key()) + sr.Lock() + defer sr.Unlock() + sr.Error = e + return e } } // Status either returns cached copy of resource's status or retrieves it via Resource.Status // depending on presense of cached copy and resource's settings func (sr *ScheduledResource) Status(meta map[string]string) (string, error) { + sr.Lock() + defer sr.Unlock() if (sr.status == "ready" || sr.Error != nil) && sr.Resource.StatusIsCacheable(meta) { return sr.status, sr.Error } - sr.Lock() - defer sr.Unlock() status, err := sr.Resource.Status(meta) sr.Error = err if sr.Resource.StatusIsCacheable(meta) { @@ -329,6 +343,12 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL ccLimiter <- struct{}{} attempts := resources.GetIntMeta(r.Resource, "retry", 1) + timeoutInSeconds := resources.GetIntMeta(r.Resource, "timeout", -1) + + waitTimeout := WaitTimeout + if timeoutInSeconds > 0 { + waitTimeout = time.Second * time.Duration(timeoutInSeconds) + } for attemptNo := 1; attemptNo <= attempts; attemptNo++ { @@ -370,7 +390,7 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL log.Printf("Checking status for %s", r.Key()) - err = r.Wait(CheckInterval) + err = r.Wait(CheckInterval, waitTimeout) if err == nil { log.Printf("Resource %s created", r.Key())