Skip to content

Commit

Permalink
Update metadata in ArrayNode TaskExecutionEvents (#4355)
Browse files Browse the repository at this point in the history
* adding PluginIdentifier to TaskExecutionMetadata

Signed-off-by: Daniel Rammer <[email protected]>

* sending input and output metadata - removing validating check on flyteadmin temporarily

Signed-off-by: Daniel Rammer <[email protected]>

* readded task execution id verification

Signed-off-by: Daniel Rammer <[email protected]>

* removed dead code

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Nov 8, 2023
1 parent 630724f commit dd91cba
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,39 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
OccurredAt: occurredAt,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: e.externalResources,
PluginIdentifier: "container",
},
TaskType: "k8s-array",
EventVersion: 1,
}

// only attach input values if taskPhase is QUEUED meaning this the first evaluation
if taskPhase == idlcore.TaskExecution_QUEUED {
if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline {
// pass inputs by value
literalMap, err := nCtx.InputReader().Get(ctx)
if err != nil {
return err
}

taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputData{
InputData: literalMap,
}
} else {
// pass inputs by reference
taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputUri{
InputUri: nCtx.InputReader().GetInputPath().String(),
}
}
}

// only attach output uri if taskPhase is SUCCEEDED
if taskPhase == idlcore.TaskExecution_SUCCEEDED {
taskExecutionEvent.OutputResult = &event.TaskExecutionEvent_OutputUri{
OutputUri: v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir()).String(),
}
}

// record TaskExecutionEvent
return e.EventRecorder.RecordTaskEvent(ctx, taskExecutionEvent, eventConfig)
}
Expand Down

0 comments on commit dd91cba

Please sign in to comment.