From e984f6a2705baf87f9c286d3b692694e0ba91c93 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 13 Jan 2025 12:52:36 +0800 Subject: [PATCH] Streaming Deck Squash Signed-off-by: Future-Outlier --- .../transformers/node_execution.go | 1 + .../transformers/node_execution_test.go | 5 + .../pkg/controller/nodes/task/handler.go | 126 +++++++++++++++--- 3 files changed, 113 insertions(+), 19 deletions(-) diff --git a/flyteadmin/pkg/repositories/transformers/node_execution.go b/flyteadmin/pkg/repositories/transformers/node_execution.go index 107e9efb70..1f17f0f131 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution.go @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution "failed to marshal occurredAt into a timestamp proto with error: %v", err) } closure.StartedAt = startedAtProto + closure.DeckUri = request.GetEvent().GetDeckUri() return nil } diff --git a/flyteadmin/pkg/repositories/transformers/node_execution_test.go b/flyteadmin/pkg/repositories/transformers/node_execution_test.go index e37d312612..11dfd7256e 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution_test.go @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{ const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow" const testInputURI = "fake://bucket/inputs.pb" +const DeckURI = "fake://bucket/deck.html" var testInputs = &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) { Event: &event.NodeExecutionEvent{ Phase: core.NodeExecution_RUNNING, OccurredAt: startedAtProto, + DeckUri: DeckURI, }, } nodeExecutionModel := models.NodeExecution{} @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) { assert.Nil(t, err) assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt) assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt())) + assert.Equal(t, DeckURI, closure.GetDeckUri()) } func TestAddTerminalState_OutputURI(t *testing.T) { @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { OutputUri: outputURI, }, OccurredAt: occurredAtProto, + DeckUri: DeckURI, }, } startedAt := occurredAt.Add(-time.Minute) @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, outputURI, closure.GetOutputUri()) assert.Equal(t, time.Minute, nodeExecutionModel.Duration) + assert.Equal(t, DeckURI, closure.GetDeckUri()) } func TestAddTerminalState_OutputData(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 000d6bd7e7..d4fc654356 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -41,6 +41,13 @@ import ( const pluginContextKey = contextutils.Key("plugin") +type DeckStatus int + +const ( + DeckUnknown DeckStatus = iota + DeckEnabled +) + type metrics struct { pluginPanics labeled.Counter unsupportedTaskType labeled.Counter @@ -71,10 +78,47 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { +func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) { + var deckURI *storage.DataReference + deckURIValue := tCtx.ow.GetDeckPath() + deckURI = &deckURIValue + + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{} + } + + p.execInfo.OutputInfo.DeckURI = deckURI +} + +func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error { + reader := tCtx.ow.GetReader() + if reader == nil && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + return nil + } + + exists, err := reader.DeckExists(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + return regErrors.Wrapf(err, "failed to check existence of deck file") + } + + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{} + } + + if exists { + deckURIValue := tCtx.ow.GetDeckPath() + p.execInfo.OutputInfo.DeckURI = &deckURIValue + } + + return nil +} + +func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { p.ttype = handler.TransitionTypeEphemeral p.pInfo = pluginCore.PhaseInfoSuccess(nil) - p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) + p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) } func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { @@ -144,10 +188,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp return ToTaskExecutionEvent(input) } -func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { - p.execInfo.OutputInfo = &handler.OutputInfo{ - OutputURI: outputPath, - DeckURI: deckPath, +func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) { + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + OutputURI: outputPath, + } + } else { + p.execInfo.OutputInfo.OutputURI = outputPath } p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ @@ -171,7 +218,8 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle } logger.Debugf(ctx, "Task still running") - return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil + // Here will send the deck uri to flyteadmin + return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil } // The plugin interface available especially for testing. @@ -380,6 +428,32 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics return t.taskMetricsMap[metricNameKey], nil } +func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { + // GetDeckStatus determines whether a task generates a deck based on its execution context. + // + // This function ensures backward compatibility with older Flytekit versions using the following logic: + // 1. For Flytekit > 1.14.3, the task template's metadata includes the `generates_deck` flag: + // - If `generates_deck` is set to true, it indicates that the task generates a deck, and DeckEnabled is returned. + // 2. If `generates_deck` is set to false or is not set (likely from older Flytekit versions): + // - DeckUnknown is returned as a placeholder status. + // - In terminal states, a HEAD request can be made to check if the deck file exists. + // + // In future implementations, a `DeckDisabled` status could be introduced for better performance optimization: + // - This would eliminate the need for a HEAD request in the final phase. + // - However, the tradeoff is that a new field would need to be added to FlyteIDL to support this behavior. + + template, err := tCtx.tr.Read(ctx) + if err != nil { + return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") + } + + if template.GetMetadata().GetGeneratesDeck() { + return DeckEnabled, nil + } + + return DeckUnknown, nil +} + func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) { pluginTrns := &pluginRequestedTransition{} @@ -464,8 +538,26 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } } + // Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality. + // The deck should be accessible even if the task is still running or has failed. + // It's possible that the deck URI may not exist in remote storage yet or will never exist. + // So, it is console's responsibility to handle the case when the deck URI actually does not exist. + deckStatus, err := GetDeckStatus(ctx, tCtx) + if err != nil { + return nil, err + } + if deckStatus == DeckEnabled { + pluginTrns.AddDeckURI(tCtx) + } + switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: + if deckStatus == DeckUnknown { + err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) + } + if err != nil { + return pluginTrns, err + } // ------------------------------------- // TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes // This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute @@ -501,18 +593,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } else { - var deckURI *storage.DataReference - if tCtx.ow.GetReader() != nil { - exists, err := tCtx.ow.GetReader().DeckExists(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) - return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") - } else if exists { - deckURIValue := tCtx.ow.GetDeckPath() - deckURI = &deckURIValue - } - } - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, + pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) @@ -520,6 +601,13 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: + // This is for backward compatibility with older Flytekit versions. + if deckStatus == DeckUnknown { + err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) + } + if err != nil { + return pluginTrns, err + } pluginTrns.ObservedFailure( &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),