From 78efbf84a5cb03e5a6269a0f1ee4b7772b601249 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Wed, 13 Dec 2023 10:32:21 -0600 Subject: [PATCH 1/3] writing zero length inputs (#4594) Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 23062a8cb3..8e96ee9645 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil } - if nodeInputs != nil && len(nodeInputs.Literals) > 0 { + if nodeInputs != nil { inputsFile := v1alpha1.GetInputsFile(dataDir) if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil { c.metrics.InputsWriteFailure.Inc(ctx) From a5ea1c870572d8f8e9802a3b09f41e4ced6a7715 Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Wed, 13 Dec 2023 08:49:51 -0800 Subject: [PATCH 2/3] Feature/add pod pending timeout config (#4590) * add PodPendingTimeout config to fail pods stuck in pending Signed-off-by: Paul Dittamo * update unit tests LastTransitionTime for pod pending timeout Signed-off-by: Paul Dittamo * set default podpending timeout 0 as to be ignored Signed-off-by: Paul Dittamo * maintain pod condition info during pod pending timeout Signed-off-by: Paul Dittamo * Remove un-needed config set for unit test Signed-off-by: Paul Dittamo * cleanup Signed-off-by: Paul Dittamo * updat unit test Signed-off-by: Paul Dittamo * don't export helper function Signed-off-by: Paul Dittamo * remove always nil error return param Signed-off-by: Paul Dittamo * cleanup Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo Co-authored-by: Dan Rammer --- .../pluginmachinery/flytek8s/config/config.go | 8 +++ .../pluginmachinery/flytek8s/pod_helper.go | 55 +++++++++++++------ .../flytek8s/pod_helper_test.go | 35 ++++++++++++ 3 files changed, 80 insertions(+), 18 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 4e777ee154..55f9cfa68c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -55,6 +55,9 @@ var ( ImagePullBackoffGracePeriod: config2.Duration{ Duration: time.Minute * 3, }, + PodPendingTimeout: config2.Duration{ + Duration: 0, + }, GpuDeviceNodeLabel: "k8s.amazonaws.com/accelerator", GpuPartitionSizeNodeLabel: "k8s.amazonaws.com/gpu-partition-size", GpuResourceName: ResourceNvidiaGPU, @@ -149,6 +152,11 @@ type K8sPluginConfig struct { // one, and the corresponding task marked as failed ImagePullBackoffGracePeriod config2.Duration `json:"image-pull-backoff-grace-period" pflag:"-,Time to wait for transient ImagePullBackoff errors to be resolved."` + // Time to wait while pod is in pending phase. If the pod is stuck in + // pending phase past this timeout, it will be inferred to be a permanent + // issue, and the corresponding task marked as failed + PodPendingTimeout config2.Duration `json:"pod-pending-timeout" pflag:"-,Time to wait while pod is stuck in pending."` + // The node label that specifies the attached GPU device. GpuDeviceNodeLabel string `json:"gpu-device-node-label" pflag:"-,The node label that specifies the attached GPU device."` diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index d8cc4dcc7f..2f3447ad0e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -572,14 +572,38 @@ func BuildIdentityPod() *v1.Pod { // and hence input gates. We should not allow bad requests that Request for large number of resource through. // In the case it makes through, we will fail after timeout func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { + phaseInfo, t := demystifyPendingHelper(status) + + if phaseInfo.Phase().IsTerminal() { + return phaseInfo, nil + } + + podPendingTimeout := config.GetK8sPluginConfig().PodPendingTimeout.Duration + if podPendingTimeout > 0 && time.Since(t) >= podPendingTimeout { + return pluginsCore.PhaseInfoRetryableFailureWithCleanup("PodPendingTimeout", phaseInfo.Reason(), &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), nil + } + + if phaseInfo.Phase() != pluginsCore.PhaseUndefined { + return phaseInfo, nil + } + + return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil +} + +func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Time) { // Search over the difference conditions in the status object. Note that the 'Pending' this function is // demystifying is the 'phase' of the pod status. This is different than the PodReady condition type also used below + phaseInfo := pluginsCore.PhaseInfoUndefined + t := time.Now() for _, c := range status.Conditions { + t = c.LastTransitionTime.Time switch c.Type { case v1.PodScheduled: if c.Status == v1.ConditionFalse { // Waiting to be scheduled. This usually refers to inability to acquire resources. - return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil + return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t } case v1.PodReasonUnschedulable: @@ -592,7 +616,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // reason: Unschedulable // status: "False" // type: PodScheduled - return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil + return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t case v1.PodReady: if c.Status == v1.ConditionFalse { @@ -637,7 +661,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // ErrImagePull -> Transitionary phase to ImagePullBackOff // ContainerCreating -> Image is being downloaded // PodInitializing -> Init containers are running - return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil + return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t case "CreateContainerError": // This may consist of: @@ -659,48 +683,45 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // synced, and therefore, only provides an // approximation of the elapsed time since the last // transition. - t := c.LastTransitionTime.Time + gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration if time.Since(t) >= gracePeriod { return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } return pluginsCore.PhaseInfoInitializing( t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}, - ), nil + ), t case "CreateContainerConfigError": - t := c.LastTransitionTime.Time gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration if time.Since(t) >= gracePeriod { return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } return pluginsCore.PhaseInfoInitializing( t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}, - ), nil + ), t case "InvalidImageName": - t := c.LastTransitionTime.Time return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t case "ImagePullBackOff": - t := c.LastTransitionTime.Time gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration if time.Since(t) >= gracePeriod { return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } return pluginsCore.PhaseInfoInitializing( @@ -708,7 +729,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}, - ), nil + ), t default: // Since we are not checking for all error states, we may end up perpetually @@ -716,12 +737,10 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // by K8s and we get elusive 'pod not found' errors // So be default if the container is not waiting with the PodInitializing/ContainerCreating // reasons, then we will assume a failure reason, and fail instantly - t := c.LastTransitionTime.Time return pluginsCore.PhaseInfoSystemRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } - } } } @@ -729,7 +748,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { } } - return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil + return phaseInfo, t } func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string { diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index a98bfe6b4f..925cb00186 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -1181,6 +1181,9 @@ func TestDemystifyPending(t *testing.T) { ImagePullBackoffGracePeriod: config1.Duration{ Duration: time.Minute * 3, }, + PodPendingTimeout: config1.Duration{ + Duration: 0, + }, })) t.Run("PodNotScheduled", func(t *testing.T) { @@ -1478,6 +1481,38 @@ func TestDemystifyPending(t *testing.T) { }) } +func TestDemystifyPendingTimeout(t *testing.T) { + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + CreateContainerErrorGracePeriod: config1.Duration{ + Duration: time.Minute * 3, + }, + ImagePullBackoffGracePeriod: config1.Duration{ + Duration: time.Minute * 3, + }, + PodPendingTimeout: config1.Duration{ + Duration: 10, + }, + })) + + s := v1.PodStatus{ + Phase: v1.PodPending, + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + }, + }, + } + s.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().PodPendingTimeout.Duration) + + t.Run("PodPendingExceedsTimeout", func(t *testing.T) { + taskStatus, err := DemystifyPending(s) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.Equal(t, "PodPendingTimeout", taskStatus.Err().Code) + }) +} + func TestDemystifySuccess(t *testing.T) { t.Run("OOMKilled", func(t *testing.T) { phaseInfo, err := DemystifySuccess(v1.PodStatus{ From 2b757850083e506c33b0aa5390e57701f13bcfb1 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Wed, 13 Dec 2023 09:10:39 -0800 Subject: [PATCH 3/3] Run single-binary gh workflows on all PRs (#4589) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- .github/workflows/single-binary.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.github/workflows/single-binary.yml b/.github/workflows/single-binary.yml index b879c98042..e693b0d3d1 100644 --- a/.github/workflows/single-binary.yml +++ b/.github/workflows/single-binary.yml @@ -6,14 +6,6 @@ concurrency: on: pull_request: - paths: - - .github/workflows/single-binary.yml - - charts/flyte-binary/** - - charts/flyte-sandbox/** - - cmd/** - - docker/sandbox-bundled/** - - Dockerfile - - go.* push: branches: - master