Skip to content

Commit

Permalink
artf/updates to source (#4443)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Nov 21, 2023
1 parent 4dbcba3 commit c3ce7aa
Show file tree
Hide file tree
Showing 40 changed files with 12,696 additions and 6,953 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
Principal: spec.GetMetadata().Principal,
LaunchPlanId: spec.LaunchPlan,
}, nil
}
Expand Down Expand Up @@ -371,6 +372,7 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
Principal: spec.GetMetadata().Principal,
LaunchPlanId: spec.LaunchPlan,
}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
}

// ExtractArtifactKeys pulls out artifact keys from Literals for lineage
// todo: rename this function to be less confusing
func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
var artifactKeys []string

Expand All @@ -713,7 +714,6 @@ func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
}
}
if input.GetCollection() != nil {
// TODO: Make recursive
for _, v := range input.GetCollection().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
Expand Down Expand Up @@ -1218,6 +1218,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, inputArtifactKeys []string, usedArtifactIDs []*core.ArtifactID) {

if len(inputArtifactKeys) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with input artifact keys [%+v] and used artifact ids [%+v]", executionID, inputArtifactKeys, usedArtifactIDs)

Expand Down
7 changes: 6 additions & 1 deletion flyteartifacts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/flyteorg/flyte/flyteartifacts
go 1.19

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/NYTimes/gizmo v1.3.6
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.14.0
Expand All @@ -21,6 +22,7 @@ require (
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/driver/postgres v1.5.3
gorm.io/gorm v1.25.5
)

Expand Down Expand Up @@ -61,7 +63,11 @@ require (
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
Expand Down Expand Up @@ -105,7 +111,6 @@ require (
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/postgres v1.5.3 // indirect
gorm.io/driver/sqlite v1.5.4 // indirect
k8s.io/apimachinery v0.24.1 // indirect
k8s.io/client-go v0.24.1 // indirect
Expand Down
82 changes: 82 additions & 0 deletions flyteartifacts/go.sum

Large diffs are not rendered by default.

23 changes: 18 additions & 5 deletions flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ type ArtifactKey struct {
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}

// WorkflowExecution - The Project/Domain is assumed to always be the same as the Artifact.
// The
type WorkflowExecution struct {
gorm.Model
ExecutionProject string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_proj;type:varchar(64)"`
ExecutionDomain string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_dom;type:varchar(64)"`
ExecutionName string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_name;type:varchar(255)"`
InputArtifacts []Artifact `gorm:"many2many:execution_inputs;"`
}

type Artifact struct {
gorm.Model
ArtifactKeyID uint
ArtifactKeyID uint `gorm:"not null;uniqueIndex:idx_artifact_version"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_artifact_version"`
Version string `gorm:"not null;type:varchar(255);uniqueIndex:idx_artifact_version"`
Partitions pgtype.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`

LiteralType []byte `gorm:"not null"`
Expand All @@ -26,8 +36,10 @@ type Artifact struct {
MetadataType string `gorm:"type:varchar(64)"`
OffloadedUserMetadata string `gorm:"type:varchar(255)"`

// Project/Domain assumed to always be the same as the Artifact
ExecutionName string `gorm:"type:varchar(255)"`
WorkflowExecutionID uint `gorm:"index:idx_artifact_wf_exec_id"`
WorkflowExecution WorkflowExecution `gorm:"foreignKey:WorkflowExecutionID;references:ID"`
NodeID string `gorm:"type:varchar(128)"`

WorkflowProject string `gorm:"type:varchar(64)"`
WorkflowDomain string `gorm:"type:varchar(64)"`
WorkflowName string `gorm:"type:varchar(255)"`
Expand All @@ -36,10 +48,11 @@ type Artifact struct {
TaskDomain string `gorm:"type:varchar(64)"`
TaskName string `gorm:"type:varchar(255)"`
TaskVersion string `gorm:"type:varchar(255)"`
NodeID string `gorm:"type:varchar(64)"`
// See Admin migration for note.
// Here nullable in the case of workflow output.
RetryAttempt *uint32

Principal string `gorm:"type:varchar(256)"`
}

type TriggerKey struct {
Expand Down
65 changes: 49 additions & 16 deletions flyteartifacts/pkg/db/gorm_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,31 @@ func ServiceToGormModel(serviceModel models.Artifact) (Artifact, error) {
Description: serviceModel.Artifact.Spec.ShortDescription,
MetadataType: serviceModel.Artifact.Spec.MetadataType,
OffloadedUserMetadata: serviceModel.OffloadedMetadata,
}

ExecutionName: serviceModel.Artifact.Spec.Execution.Name,
if serviceModel.Artifact.GetSource().GetWorkflowExecution() != nil {
// artifact and execution project/domains are always the same.
// Note the service model will not have workflow execution if it was an upload
wfExec := WorkflowExecution{
ExecutionProject: serviceModel.Artifact.ArtifactId.ArtifactKey.Project,
ExecutionDomain: serviceModel.Artifact.ArtifactId.ArtifactKey.Domain,
ExecutionName: serviceModel.Source.WorkflowExecution.Name,
}
ga.WorkflowExecution = wfExec
ga.NodeID = serviceModel.Source.NodeId
}
if serviceModel.GetSource() != nil {
ga.Principal = serviceModel.GetSource().GetPrincipal()
}

if serviceModel.Artifact.Spec.TaskExecution != nil {
ga.TaskProject = serviceModel.Artifact.Spec.TaskExecution.TaskId.Project
ga.TaskDomain = serviceModel.Artifact.Spec.TaskExecution.TaskId.Domain
ga.TaskName = serviceModel.Artifact.Spec.TaskExecution.TaskId.Name
ga.TaskVersion = serviceModel.Artifact.Spec.TaskExecution.TaskId.Version
ga.RetryAttempt = &serviceModel.Artifact.Spec.TaskExecution.RetryAttempt
if serviceModel.GetSource().GetTaskId() != nil {
// If task id is there, so should the retry attempt
retry := serviceModel.GetSource().GetRetryAttempt()
ga.RetryAttempt = &retry
ga.TaskProject = serviceModel.GetSource().GetTaskId().Project
ga.TaskDomain = serviceModel.GetSource().GetTaskId().Domain
ga.TaskName = serviceModel.GetSource().GetTaskId().Name
ga.TaskVersion = serviceModel.GetSource().GetTaskId().Version
}

return ga, nil
Expand Down Expand Up @@ -102,15 +117,8 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {
Version: ga.Version,
},
Spec: &artifact.ArtifactSpec{
Value: lit,
Type: lt,
TaskExecution: nil,
Execution: &core.WorkflowExecutionIdentifier{
Project: ga.ArtifactKey.Project,
Domain: ga.ArtifactKey.Domain,
Name: ga.ExecutionName,
},
Principal: "",
Value: lit,
Type: lt,
ShortDescription: ga.Description,
UserMetadata: nil,
MetadataType: ga.MetadataType,
Expand All @@ -121,6 +129,31 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {
if p != nil {
a.ArtifactId.Dimensions = &core.ArtifactID_Partitions{Partitions: p}
}
aSrc := artifact.ArtifactSource{
NodeId: ga.NodeID,
Principal: ga.Principal,
}
if ga.RetryAttempt != nil {
aSrc.RetryAttempt = *ga.RetryAttempt
}
if ga.WorkflowExecutionID != 0 {
execID := &core.WorkflowExecutionIdentifier{
Project: ga.ArtifactKey.Project,
Domain: ga.ArtifactKey.Domain,
Name: ga.WorkflowExecution.ExecutionName,
}
aSrc.WorkflowExecution = execID
}
if ga.TaskProject != "" {
aSrc.TaskId = &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: ga.TaskProject,
Domain: ga.TaskDomain,
Name: ga.TaskName,
Version: ga.TaskVersion,
}
}
a.Source = &aSrc

return models.Artifact{
Artifact: a,
Expand Down
36 changes: 27 additions & 9 deletions flyteartifacts/pkg/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ var Migrations = []*gormigrate.Migration{
Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}
type WorkflowExecution struct {
gorm.Model
ExecutionProject string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_proj;type:varchar(64)"`
ExecutionDomain string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_dom;type:varchar(64)"`
ExecutionName string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_name;type:varchar(255)"`
InputArtifacts []Artifact `gorm:"many2many:execution_inputs;"`
}

type Artifact struct {
gorm.Model
ArtifactKeyID uint `gorm:"uniqueIndex:idx_pdnv"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"type:varchar(255);index:idx_artifact_version;uniqueIndex:idx_pdnv"`
Partitions *pgtype.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`
ArtifactKeyID uint `gorm:"not null;uniqueIndex:idx_artifact_version"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);uniqueIndex:idx_artifact_version"`
Partitions pgtype.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`

LiteralType []byte `gorm:"not null"`
LiteralValue []byte `gorm:"not null"`
Expand All @@ -30,8 +38,10 @@ var Migrations = []*gormigrate.Migration{
MetadataType string `gorm:"type:varchar(64)"`
OffloadedUserMetadata string `gorm:"type:varchar(255)"`

// Project/Domain assumed to always be the same as the Artifact
ExecutionName string `gorm:"type:varchar(255)"`
WorkflowExecutionID uint `gorm:"index:idx_artifact_wf_exec_id"`
WorkflowExecution WorkflowExecution `gorm:"foreignKey:WorkflowExecutionID;references:ID"`
NodeID string `gorm:"type:varchar(128)"`

WorkflowProject string `gorm:"type:varchar(64)"`
WorkflowDomain string `gorm:"type:varchar(64)"`
WorkflowName string `gorm:"type:varchar(255)"`
Expand All @@ -40,14 +50,22 @@ var Migrations = []*gormigrate.Migration{
TaskDomain string `gorm:"type:varchar(64)"`
TaskName string `gorm:"type:varchar(255)"`
TaskVersion string `gorm:"type:varchar(255)"`
NodeID string `gorm:"type:varchar(64)"`
// See Admin migration for note.
// Here nullable in the case of workflow output.
RetryAttempt *uint32

Principal string `gorm:"type:varchar(256)"`
}
return tx.AutoMigrate(
&ArtifactKey{}, &Artifact{},
err := tx.AutoMigrate(
&ArtifactKey{}, &Artifact{}, &WorkflowExecution{},
)
if err != nil {
return err
}

tx.Exec("CREATE INDEX idx_gin_artifact_partitions ON artifacts USING GIN (partitions)")
tx.Exec("CREATE INDEX idx_created_at ON artifacts (created_at desc)")
return tx.Error
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
Expand Down
Loading

0 comments on commit c3ce7aa

Please sign in to comment.