Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Flyteadmin] Add variablemap in dataproxy for dataclass/pydantic #6136

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions flyteadmin/dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,13 @@
}

var lm *core.LiteralMap
var vm *core.VariableMap
if ioType == common.ArtifactTypeI {
lm = resp.GetFullInputs()
vm = resp.GetInputVariableMap()
} else if ioType == common.ArtifactTypeO {
lm = resp.GetFullOutputs()
vm = resp.GetOutputVariableMap()

Check warning on line 376 in flyteadmin/dataproxy/service.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/dataproxy/service.go#L376

Added line #L376 was not covered by tests
} else {
// Assume deck, and create a download link request
dlRequest := service.CreateDownloadLinkRequest{
Expand Down Expand Up @@ -402,13 +405,15 @@
Data: &service.GetDataResponse_LiteralMap{
LiteralMap: lm,
},
VariableMap: vm,
}, nil
}

func (s Service) GetDataFromTaskExecution(ctx context.Context, taskExecID *core.TaskExecutionIdentifier, ioType common.ArtifactType, name string) (
*service.GetDataResponse, error) {

var lm *core.LiteralMap
var vm *core.VariableMap
reqT := &admin.TaskExecutionGetDataRequest{
Id: taskExecID,
}
Expand All @@ -419,8 +424,10 @@

if ioType == common.ArtifactTypeI {
lm = resp.GetFullInputs()
vm = resp.GetInputVariableMap()

Check warning on line 427 in flyteadmin/dataproxy/service.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/dataproxy/service.go#L427

Added line #L427 was not covered by tests
} else if ioType == common.ArtifactTypeO {
lm = resp.GetFullOutputs()
vm = resp.GetOutputVariableMap()
} else {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "deck type cannot be specified with a retry attempt, just use the node instead")
}
Expand All @@ -440,6 +447,7 @@
Data: &service.GetDataResponse_LiteralMap{
LiteralMap: lm,
},
VariableMap: vm,
}, nil

}
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down Expand Up @@ -148,7 +148,7 @@ require (
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/mattn/goveralls v0.0.6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -161,7 +161,7 @@ require (
github.com/ory/go-convenience v0.1.0 // indirect
github.com/ory/viper v1.7.5 // indirect
github.com/pborman/uuid v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
Expand All @@ -173,7 +173,7 @@ require (
github.com/sendgrid/rest v2.6.9+incompatible // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.11.0 // indirect
Expand Down
18 changes: 9 additions & 9 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elazarl/goproxy v0.0.0-20181003060214-f58a169a71a5/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk=
github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -972,8 +972,8 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
Expand Down Expand Up @@ -1088,8 +1088,8 @@ github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUr
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.8.0/go.mod h1:D6yutnOGMveHEPV7VQOuvI/gXY61bv+9bAOTRnLElKs=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 h1:dy81yyLYJDwMTifq24Oi/IslOslRrDSb3jwDggjz3Z0=
github.com/pelletier/go-toml/v2 v2.0.0-beta.8/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
Expand Down Expand Up @@ -1218,8 +1218,8 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.2.0/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/afero v1.3.2/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo=
github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw=
github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
Expand Down Expand Up @@ -1655,10 +1655,10 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
84 changes: 79 additions & 5 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,17 +545,91 @@
return err
})

// Get the output variable map from workflow model TypedInterface
var inputVariableMap, outputVariableMap *core.VariableMap
group.Go(func() error {
var err error

switch nodeExecution.GetClosure().GetTargetMetadata().(type) {
case *admin.NodeExecutionClosure_WorkflowNodeMetadata:
execID := nodeExecution.GetClosure().GetTargetMetadata().(*admin.NodeExecutionClosure_WorkflowNodeMetadata).WorkflowNodeMetadata.GetExecutionId()
workflowModel, err := m.db.WorkflowRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: execID.GetProject(),
Domain: execID.GetDomain(),
Name: execID.GetName(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get workflow model for node execution [%+v] with err %v", request.GetId(), err)
return err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(groupCtx, "Failed to transform workflow model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

Check warning on line 570 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L554-L570

Added lines #L554 - L570 were not covered by tests

inputVariableMap = workflow.GetClosure().GetCompiledWorkflow().GetPrimary().GetTemplate().GetInterface().GetInputs()
outputVariableMap = workflow.GetClosure().GetCompiledWorkflow().GetPrimary().GetTemplate().GetInterface().GetOutputs()

Check warning on line 573 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L572-L573

Added lines #L572 - L573 were not covered by tests

case *admin.NodeExecutionClosure_TaskNodeMetadata:
execID := nodeExecution.GetId().GetExecutionId()
executionModel, err := m.db.ExecutionRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: execID.GetProject(),
Domain: execID.GetDomain(),
Name: execID.GetName(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get execution model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

Check warning on line 586 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L575-L586

Added lines #L575 - L586 were not covered by tests

execution, err := transformers.FromExecutionModel(groupCtx, executionModel, transformers.DefaultExecutionTransformerOptions)

if err != nil {
logger.Debugf(groupCtx, "Failed to transform execution model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

Check warning on line 593 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L588-L593

Added lines #L588 - L593 were not covered by tests

taskModel, err := m.db.TaskRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: execID.GetProject(),
Domain: execID.GetDomain(),
Name: execution.GetSpec().GetLaunchPlan().GetName(),
Version: execution.GetSpec().GetLaunchPlan().GetVersion(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get task model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

Check warning on line 605 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L595-L605

Added lines #L595 - L605 were not covered by tests

task, err := transformers.FromTaskModel(taskModel)

if err != nil {
logger.Debugf(groupCtx, "Failed to transform task model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

Check warning on line 612 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L607-L612

Added lines #L607 - L612 were not covered by tests

inputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetInputs()
outputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetOutputs()

Check warning on line 615 in flyteadmin/pkg/manager/impl/node_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/node_execution_manager.go#L614-L615

Added lines #L614 - L615 were not covered by tests
}
return err
})

err = group.Wait()
if err != nil {
return nil, err
}

response := &admin.NodeExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromNodeExecutionID(request.GetId(), nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""),
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromNodeExecutionID(request.GetId(), nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""),
InputVariableMap: inputVariableMap,
OutputVariableMap: outputVariableMap,
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
Expand Down
40 changes: 35 additions & 5 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,24 +328,54 @@

var outputs *core.LiteralMap
var outputURLBlob *admin.UrlBlob
var inputVariableMap, outputVariableMap *core.VariableMap
group.Go(func() error {
var err error
outputs, outputURLBlob, err = util.GetOutputs(groupCtx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, taskExecution.GetClosure())
return err
})

group.Go(func() error {
var err error
taskModel, err := m.db.TaskRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: taskExecution.GetId().GetTaskId().GetProject(),
Domain: taskExecution.GetId().GetTaskId().GetDomain(),
Name: taskExecution.GetId().GetTaskId().GetName(),
Version: taskExecution.GetId().GetTaskId().GetVersion(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get task [%+v] with err %v", taskExecution.GetId().GetTaskId(), err)
return err
}

Check warning on line 351 in flyteadmin/pkg/manager/impl/task_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/task_execution_manager.go#L349-L351

Added lines #L349 - L351 were not covered by tests

task, err := transformers.FromTaskModel(taskModel)

if err != nil {
logger.Debugf(groupCtx, "Failed to transform task model [%+v] with err %v", taskModel, err)
return err
}

Check warning on line 358 in flyteadmin/pkg/manager/impl/task_execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/task_execution_manager.go#L356-L358

Added lines #L356 - L358 were not covered by tests

inputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetInputs()
outputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetOutputs()

return err
})

err = group.Wait()
if err != nil {
return nil, err
}

response := &admin.TaskExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromTaskExecutionID(request.GetId(), false),
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromTaskExecutionID(request.GetId(), false),
OutputVariableMap: outputVariableMap,
InputVariableMap: inputVariableMap,
}

m.metrics.TaskExecutionInputBytes.Observe(float64(response.GetInputs().GetBytes()))
Expand Down
15 changes: 15 additions & 0 deletions flyteidl/clients/go/admin/mocks/isGetDataResponse_VariableMap.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/node_execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading