Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artf/triggers #4394

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions flyteartifacts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/NYTimes/gizmo v1.3.6
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v1.9.5
github.com/go-gormigrate/gormigrate/v2 v2.1.1
Expand All @@ -19,6 +20,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/gorm v1.25.5
)

Expand All @@ -43,7 +45,6 @@ require (
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/ncw/swift v1.0.53 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
Expand All @@ -87,6 +89,7 @@ require (
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
Expand All @@ -99,7 +102,6 @@ require (
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions flyteartifacts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
Expand Down Expand Up @@ -414,6 +415,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -473,6 +475,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -963,6 +966,7 @@ k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
Expand Down
35 changes: 32 additions & 3 deletions flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

type ArtifactKey struct {
gorm.Model
Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
Project string `gorm:"uniqueIndex:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}

type Artifact struct {
Expand Down Expand Up @@ -41,3 +41,32 @@ type Artifact struct {
// Here nullable in the case of workflow output.
RetryAttempt *uint32
}

type TriggerKey struct {
gorm.Model
Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
RunsOn []ArtifactKey `gorm:"many2many:active_trigger_artifact_keys;"`
}

type LaunchPlanID struct {
Name string `gorm:"not null;index:idx_lp_id;type:varchar(255)"`
Version string `gorm:"not null;type:varchar(255);index:idx_launch_plan_version"`
}

type Trigger struct {
gorm.Model
TriggerKeyID uint `gorm:"uniqueIndex:idx_trigger_pdnv"`
TriggerKey TriggerKey `gorm:"foreignKey:TriggerKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_trigger_version;uniqueIndex:idx_trigger_pdnv"`

// Unlike the one in the TriggerKey table, these are the list of artifact keys as specified by the user
// for this specific version. Currently just the key but can add additional fields in the future.
RunsOn []ArtifactKey `gorm:"many2many:trigger_ids_artifact_keys;"`

Active bool `gorm:"index:idx_active"`
LaunchPlanID LaunchPlanID `gorm:"embedded"`
LaunchPlanSpec []byte `gorm:"not null"`
LaunchPlanClosure []byte `gorm:"not null"`
}
98 changes: 95 additions & 3 deletions flyteartifacts/pkg/db/gorm_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package db

import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/jackc/pgx/v5/pgtype"
)

Expand Down Expand Up @@ -89,7 +92,6 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {

// gatepr: principal is missing still - can be added following discussion on source object.
// taskexecution and additional source information to be added when resolved.
// gatepr: implement tags
a := artifact.Artifact{
ArtifactId: &core.ArtifactID{
ArtifactKey: &core.ArtifactKey{
Expand Down Expand Up @@ -123,7 +125,97 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {
return models.Artifact{
Artifact: a,
OffloadedMetadata: "",
LiteralTypeBytes: nil,
LiteralValueBytes: nil,
LiteralTypeBytes: ga.LiteralType,
LiteralValueBytes: ga.LiteralValue,
}, nil
}

func ServiceToGormTrigger(serviceTrigger models.Trigger) Trigger {

t := Trigger{
TriggerKey: TriggerKey{
Project: serviceTrigger.Project,
Domain: serviceTrigger.Domain,
Name: serviceTrigger.Name,
},
Version: serviceTrigger.Version,
Active: serviceTrigger.Active,
LaunchPlanID: LaunchPlanID{
Name: serviceTrigger.LaunchPlan.Id.Name,
Version: serviceTrigger.LaunchPlan.Id.Version,
},
LaunchPlanSpec: serviceTrigger.SpecBytes,
LaunchPlanClosure: serviceTrigger.ClosureBytes,
}

var runsOn = make([]ArtifactKey, len(serviceTrigger.RunsOn))
for i, a := range serviceTrigger.RunsOn {
runsOn[i] = ArtifactKey{
Project: a.ArtifactKey.Project,
Domain: a.ArtifactKey.Domain,
Name: a.ArtifactKey.Name,
}
}
t.RunsOn = runsOn

return t
}

func GormToServiceTrigger(gormTrigger Trigger) (models.Trigger, error) {
spec := &admin.LaunchPlanSpec{}
closure := &admin.LaunchPlanClosure{}
if err := proto.Unmarshal(gormTrigger.LaunchPlanSpec, spec); err != nil {
return models.Trigger{}, err
}
if err := proto.Unmarshal(gormTrigger.LaunchPlanClosure, closure); err != nil {
return models.Trigger{}, err
}
lpID := core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Project: gormTrigger.TriggerKey.Project,
Domain: gormTrigger.TriggerKey.Domain,
Name: gormTrigger.LaunchPlanID.Name,
Version: gormTrigger.Version, // gormTrigger.LaunchPlanID.Version,
}
t := models.Trigger{
Project: gormTrigger.TriggerKey.Project,
Domain: gormTrigger.TriggerKey.Domain,
Name: gormTrigger.TriggerKey.Name,
Version: gormTrigger.Version,
Active: gormTrigger.Active,
LaunchPlanID: lpID,
LaunchPlan: &admin.LaunchPlan{
Id: &lpID,
Spec: spec,
Closure: closure,
},
SpecBytes: gormTrigger.LaunchPlanSpec,
ClosureBytes: gormTrigger.LaunchPlanClosure,
}

// TODO: This is a copy/paste of the code in transformers.go. Refactor.
// Basically the DB model only has artifact keys, not whole artifact IDs including partitions
// so pull the artifact IDs again from the spec.
lc := spec.GetEntityMetadata().GetLaunchConditions()

var err error
idlTrigger := core.Trigger{}
err = ptypes.UnmarshalAny(lc, &idlTrigger)
if err != nil {
logger.Errorf(context.TODO(), "Failed to unmarshal launch conditions to idl, metadata: [%+v]", spec.GetEntityMetadata())
return models.Trigger{}, err
}
if len(idlTrigger.Triggers) == 0 {
return models.Trigger{}, fmt.Errorf("invalid request to CreateTrigger, launch conditions cannot be empty")
}
var runsOnArtifactIDs = make([]core.ArtifactID, len(idlTrigger.Triggers))
for i, t := range idlTrigger.Triggers {
runsOnArtifactIDs[i] = *t
runsOnArtifactIDs[i].ArtifactKey.Project = lpID.Project
runsOnArtifactIDs[i].ArtifactKey.Domain = lpID.Domain
}

t.RunsOn = runsOnArtifactIDs

return t, nil
}
4 changes: 4 additions & 0 deletions flyteartifacts/pkg/db/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type gormMetrics struct {
GetDuration promutils.StopWatch
UpdateDuration promutils.StopWatch
SearchDuration promutils.StopWatch

CreateTriggerDuration promutils.StopWatch
}

func newMetrics(scope promutils.Scope) gormMetrics {
Expand All @@ -26,5 +28,7 @@ func newMetrics(scope promutils.Scope) gormMetrics {
"update", "time taken to update an entry", time.Millisecond),
SearchDuration: scope.MustNewStopWatch(
"search", "time taken for searching", time.Millisecond),
CreateTriggerDuration: scope.MustNewStopWatch(
"createT", "time taken to create a new trigger", time.Millisecond),
}
}
43 changes: 42 additions & 1 deletion flyteartifacts/pkg/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,48 @@ var Migrations = []*gormigrate.Migration{
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
"artifact_keys", "artifacts",
"artifacts", "artifact_keys",
)
},
},
{
ID: "2023-10-22-trigger",
Migrate: func(tx *gorm.DB) error {
type TriggerKey struct {
gorm.Model
Project string `gorm:"index:idx_t_pdn;index:idx_t_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_t_pdn;index:idx_t_dom;type:varchar(64)"`
Name string `gorm:"index:idx_t_pdn;index:idx_t_name;type:varchar(255)"`
RunsOn []ArtifactKey `gorm:"many2many:active_trigger_artifact_keys;"`
}

type LaunchPlanID struct {
Name string `gorm:"not null;index:idx_lp_id;type:varchar(255)"`
Version string `gorm:"not null;type:varchar(255);index:idx_launch_plan_version"`
}

type Trigger struct {
gorm.Model
TriggerKeyID uint `gorm:"uniqueIndex:idx_trigger_pdnv"`
TriggerKey TriggerKey `gorm:"foreignKey:TriggerKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_trigger_version;uniqueIndex:idx_trigger_pdnv"`

// Unlike the one in the TriggerKey table, these are the list of artifact keys as specified by the user
// for this specific version. Currently just the key but can add additional fields in the future.
RunsOn []ArtifactKey `gorm:"many2many:trigger_ids_artifact_keys;"`

Active bool `gorm:"index:idx_t_active"`
LaunchPlanID LaunchPlanID `gorm:"embedded"`
LaunchPlanSpec []byte `gorm:"not null"`
LaunchPlanClosure []byte `gorm:"not null"`
}
return tx.AutoMigrate(
&TriggerKey{}, &Trigger{},
)
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
"triggers", "trigger_keys",
)
},
},
Expand Down
Loading
Loading