Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artf/lineage and fixes #4680

Merged
merged 17 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/flyte-sandbox/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ A Helm chart for the Flyte local sandbox
| flyte-binary.configuration.inline.cloudEvents.cloudEventVersion | string | `"v2"` | |
| flyte-binary.configuration.inline.cloudEvents.enable | bool | `true` | |
| flyte-binary.configuration.inline.cloudEvents.type | string | `"sandbox"` | |
| flyte-binary.configuration.inline.flyteadmin.featureGates.enableArtifacts | bool | `true` | |
| flyte-binary.configuration.inline.plugins.k8s.default-env-vars[0].FLYTE_AWS_ENDPOINT | string | `"http://{{ printf \"%s-minio\" .Release.Name | trunc 63 | trimSuffix \"-\" }}.{{ .Release.Namespace }}:9000"` | |
| flyte-binary.configuration.inline.plugins.k8s.default-env-vars[1].FLYTE_AWS_ACCESS_KEY_ID | string | `"minio"` | |
| flyte-binary.configuration.inline.plugins.k8s.default-env-vars[2].FLYTE_AWS_SECRET_ACCESS_KEY | string | `"miniostorage"` | |
Expand Down
3 changes: 3 additions & 0 deletions charts/flyte-sandbox/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ flyte-binary:
enable: true
cloudEventVersion: v2
type: sandbox
flyteadmin:
featureGates:
enableArtifacts: true
artifactsServer:
artifactBlobStoreConfig:
type: stow
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ data:
cloudEventVersion: v2
enable: true
type: sandbox
flyteadmin:
featureGates:
enableArtifacts: true
plugins:
k8s:
default-env-vars:
Expand Down Expand Up @@ -878,7 +881,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: c1BaWUVKZ3RMNHBNa1A5bQ==
haSharedSecret: SzdFS2NpQ1FCMGZpZmdvVA==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1308,7 +1311,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: 45373e78f147407fbee7ae52351e92542475be8fefff767255d1b3bdc48c3a0a
checksum/configuration: a34d42c55d42f2ad83faeb5a31a25315aec8bac20cc32bd57b34d57cf62bc84e
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1474,7 +1477,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 9c81d342cbc7f96cb7d02181f9f7516d4b6e47f6e1475c039241991ac568b851
checksum/secret: 76b53c5ec281a0fe662b1a3cf26e50a247832d71aafa79a49294e1c7e82cb462
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ data:
cloudEventVersion: v2
enable: true
type: sandbox
flyteadmin:
featureGates:
enableArtifacts: true
plugins:
k8s:
default-env-vars:
Expand Down Expand Up @@ -858,7 +861,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: YTNTZDQ0WTFQR1pNb2lJRA==
haSharedSecret: NGtneHhldjRLS0s2VnBFYg==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1256,7 +1259,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: 9502021d87030cfe424e840eee4c26900579d180008686827db9d9ef3f723a5b
checksum/configuration: 9e539e6f6c99d5c7e262b393ccaf10163e49501a72e500108d7abf1fa28efec7
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1422,7 +1425,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 4f7d40beb7c6c4fa355d9a6a5ebeda49896ecf6bb4030db160e2267ef0e4d1f8
checksum/secret: 7ce7b101d2a314d4e180934a2329db83ad951e7d832fd0855b9adecbc52dac57
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: cGlKcHZrZFRRWllURW9Xdw==
haSharedSecret: QlRBU1c2TE9Ma1dGZHdNOQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -982,7 +982,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: cfd24691ea1c055cf1ebdb9f15b6e46916c8831685a84464932920b9b05cb2a8
checksum/secret: 5b126beacb3ed19e3d0c3a580dcbc29332e445220eebd916e688e0c8d7cfdf23
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
8 changes: 0 additions & 8 deletions flyteadmin/pkg/artifacts/config.go

This file was deleted.

6 changes: 2 additions & 4 deletions flyteartifacts/pkg/db/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ func (r *RDSStorage) CreateArtifact(ctx context.Context, serviceModel models.Art
}

func (r *RDSStorage) handleUriGet(ctx context.Context, uri string) (models.Artifact, error) {
artifactID, tag, err := lib.ParseFlyteURL(uri)
artifactID, err := lib.ParseFlyteURL(uri)
if err != nil {
logger.Errorf(ctx, "Failed to parse uri [%s]: %+v", uri, err)
return models.Artifact{}, err
}
if tag != "" {
return models.Artifact{}, fmt.Errorf("tag not implemented yet")
}

logger.Debugf(ctx, "Extracted artifact id [%v] from uri [%s], using id handler", artifactID, uri)
return r.handleArtifactIdGet(ctx, artifactID)
}
Expand Down
5 changes: 4 additions & 1 deletion flyteartifacts/pkg/lib/constants.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package lib

// ArtifactKey - This is used to tag Literals as a tracking bit.
// ArtifactKey - This string is used to identify Artifacts when all you have
// is the underlying Literal. Look for this key under the literal's metadata field. This situation can arise
// when a user fetches an artifact, using something like flyte remote or flyte console, and then kicks
// off an execution using that literal.
const ArtifactKey = "_ua"
46 changes: 20 additions & 26 deletions flyteartifacts/pkg/lib/url_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,44 @@ package lib

import (
"errors"
"net/url"
"regexp"
"strings"

"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"net/url"
"regexp"
)

var flyteURLNameRe = regexp.MustCompile(`(?P<name>[\w/-]+)(?:(:(?P<tag>\w+))?)(?:(@(?P<version>\w+))?)`)
var flyteURLNameRe = regexp.MustCompile(`(?P<project>[\w-]+)/(?P<domain>[\w-]+)/(?P<name>[\w/-]+)(@(?P<version>[\w/-]+))?`)

func ParseFlyteURL(urlStr string) (core.ArtifactID, string, error) {
func ParseFlyteURL(urlStr string) (core.ArtifactID, error) {
if len(urlStr) == 0 {
return core.ArtifactID{}, "", errors.New("URL cannot be empty")
return core.ArtifactID{}, errors.New("URL cannot be empty")
}

parsed, err := url.Parse(urlStr)
if err != nil {
return core.ArtifactID{}, "", err
return core.ArtifactID{}, err
}
queryValues, err := url.ParseQuery(parsed.RawQuery)
if err != nil {
return core.ArtifactID{}, "", err
return core.ArtifactID{}, err
}
projectDomainName := strings.Split(strings.Trim(parsed.Path, "/"), "/")
if len(projectDomainName) < 3 {
return core.ArtifactID{}, "", errors.New("invalid URL format")
}
project, domain, name := projectDomainName[0], projectDomainName[1], strings.Join(projectDomainName[2:], "/")
version := ""
tag := ""

var project, domain, name, version string
queryDict := make(map[string]string)

if match := flyteURLNameRe.FindStringSubmatch(name); match != nil {
name = match[1]
if match[3] != "" {
tag = match[3]
if match := flyteURLNameRe.FindStringSubmatch(parsed.Path); match != nil {
if len(match) < 4 {
return core.ArtifactID{}, fmt.Errorf("insufficient components specified %s", parsed.Path)
}
if match[5] != "" {
project = match[1]
domain = match[2]
name = match[3]
if len(match) > 5 {
version = match[5]
}

if tag != "" && (version != "" || len(queryValues) > 0) {
return core.ArtifactID{}, "", errors.New("cannot specify tag with version or querydict")
}
} else {
return core.ArtifactID{}, fmt.Errorf("unable to parse %s", parsed.Path)
}

for key, values := range queryValues {
Expand All @@ -68,5 +62,5 @@ func ParseFlyteURL(urlStr string) (core.ArtifactID, string, error) {
Partitions: p,
}

return a, tag, nil
return a, nil
}
36 changes: 13 additions & 23 deletions flyteartifacts/pkg/lib/url_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,42 @@ import (
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
)

func TestURLParseWithTag(t *testing.T) {
artifactID, tag, err := ParseFlyteURL("flyte://av0.1/project/domain/name:tag")
func TestURLParseWithVersionAndPartitions(t *testing.T) {
artifactID, err := ParseFlyteURL("flyte://av0.1/project/domain/name@version?foo=bar&ham=spam")
expPartitions := map[string]string{"foo": "bar", "ham": "spam"}
assert.NoError(t, err)

assert.Equal(t, "project", artifactID.ArtifactKey.Project)
assert.Equal(t, "domain", artifactID.ArtifactKey.Domain)
assert.Equal(t, "name", artifactID.ArtifactKey.Name)
assert.Equal(t, "", artifactID.Version)
assert.Equal(t, "tag", tag)
assert.Nil(t, artifactID.GetPartitions())
assert.Equal(t, "version", artifactID.Version)
p := artifactID.GetPartitions()
mapP := models.PartitionsFromIdl(context.TODO(), p)
assert.Equal(t, expPartitions, mapP)
}

func TestURLParseWithVersionAndPartitions(t *testing.T) {
artifactID, tag, err := ParseFlyteURL("flyte://av0.1/project/domain/name@version?foo=bar&ham=spam")
func TestURLParseWithSlashVersionAndPartitions(t *testing.T) {
artifactID, err := ParseFlyteURL("flyte://av0.1/project/domain/name/more@version/abc/0/o0?foo=bar&ham=spam")
expPartitions := map[string]string{"foo": "bar", "ham": "spam"}
assert.NoError(t, err)

assert.Equal(t, "project", artifactID.ArtifactKey.Project)
assert.Equal(t, "domain", artifactID.ArtifactKey.Domain)
assert.Equal(t, "name", artifactID.ArtifactKey.Name)
assert.Equal(t, "version", artifactID.Version)
assert.Equal(t, "", tag)
assert.Equal(t, "name/more", artifactID.ArtifactKey.Name)
assert.Equal(t, "version/abc/0/o0", artifactID.Version)
p := artifactID.GetPartitions()
mapP := models.PartitionsFromIdl(context.TODO(), p)
assert.Equal(t, expPartitions, mapP)
}

func TestURLParseFailsWithBothTagAndPartitions(t *testing.T) {
_, _, err := ParseFlyteURL("flyte://av0.1/project/domain/name:tag?foo=bar&ham=spam")
assert.Error(t, err)
}

func TestURLParseWithBothTagAndVersion(t *testing.T) {
_, _, err := ParseFlyteURL("flyte://av0.1/project/domain/name:tag@version")
assert.Error(t, err)
}

func TestURLParseNameWithSlashes(t *testing.T) {
artifactID, tag, err := ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes")
artifactID, err := ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes")
assert.NoError(t, err)
assert.Equal(t, "project", artifactID.ArtifactKey.Project)
assert.Equal(t, "domain", artifactID.ArtifactKey.Domain)
assert.Equal(t, "name/with/slashes", artifactID.ArtifactKey.Name)
assert.Equal(t, "", tag)

artifactID, _, err = ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes?ds=2020-01-01")
artifactID, err = ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes?ds=2020-01-01")
assert.NoError(t, err)
assert.Equal(t, "name/with/slashes", artifactID.ArtifactKey.Name)
assert.Equal(t, "project", artifactID.ArtifactKey.Project)
Expand Down
20 changes: 16 additions & 4 deletions flyteartifacts/pkg/server/processor/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,25 @@ func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2

func (s *ServiceCallHandler) HandleEventExecStart(ctx context.Context, evt *event.CloudEventExecutionStart) error {

if len(evt.ArtifactIds) > 0 {
var inputsUsed []*core.ArtifactID

inputsUsed = append(inputsUsed, evt.ArtifactIds...)
for _, x := range evt.ArtifactTrackers {

dummyURI := fmt.Sprintf("flyte://av0.1/%s", x)
idWithVersion, err := lib.ParseFlyteURL(dummyURI)
if err != nil {
logger.Errorf(ctx, "Error parsing input %s for execution start: %v", x, err)
return err
}
inputsUsed = append(inputsUsed, &idWithVersion)
}

if len(inputsUsed) > 0 {
// metric
req := &artifact.ExecutionInputsRequest{
ExecutionId: evt.ExecutionId,
Inputs: evt.ArtifactIds,
Inputs: inputsUsed,
}
_, err := s.service.SetExecutionInputs(ctx, req)
if err != nil {
Expand Down Expand Up @@ -172,8 +186,6 @@ func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variabl
}

var partitions map[string]string
// todo: consider updating idl to make CreateArtifactRequest just take a full Partitions
// object rather than a mapstrstr @eapolinario @enghabu
if partialID.GetPartitions().GetValue() != nil && len(partialID.GetPartitions().GetValue()) > 0 {
partitions = make(map[string]string, len(partialID.GetPartitions().GetValue()))
for k, lv := range partialID.GetPartitions().GetValue() {
Expand Down
3 changes: 3 additions & 0 deletions flyteartifacts/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
}

func (a *ArtifactService) CreateArtifact(ctx context.Context, req *artifact.CreateArtifactRequest) (*artifact.CreateArtifactResponse, error) {

// todo: add a request validating section, check for nils, etc.

resp, err := a.Service.CreateArtifact(ctx, req)
if err != nil {
return resp, err
Expand Down Expand Up @@ -58,7 +61,7 @@
return a.Service.CreateTrigger(ctx, req)
}

func (a *ArtifactService) DeleteTrigger(ctx context.Context, req *artifact.DeleteTriggerRequest) (*artifact.DeleteTriggerResponse, error) {

Check failure on line 64 in flyteartifacts/pkg/server/server.go

View workflow job for this annotation

GitHub Actions / compile

undefined: artifact.DeleteTriggerRequest

Check failure on line 64 in flyteartifacts/pkg/server/server.go

View workflow job for this annotation

GitHub Actions / compile

undefined: artifact.DeleteTriggerResponse
return a.Service.DeleteTrigger(ctx, req)
}

Expand Down Expand Up @@ -111,7 +114,7 @@
storage := db.NewStorage(ctx, scope.NewSubScope("storage:rds"))
blobStore := blob.NewArtifactBlobStore(ctx, scope.NewSubScope("storage:s3"))
coreService := NewCoreService(storage, &blobStore, scope.NewSubScope("server"))
triggerHandler, err := NewTriggerEngine(ctx, storage, &coreService, scope.NewSubScope("triggers"))

Check failure on line 117 in flyteartifacts/pkg/server/server.go

View workflow job for this annotation

GitHub Actions / compile

cannot use &coreService (value of type *CoreService) as artifact.ArtifactRegistryServer value in argument to NewTriggerEngine: *CoreService does not implement artifact.ArtifactRegistryServer (missing method DeactivateTrigger)
if err != nil {
logger.Errorf(ctx, "Failed to create Trigger engine, stopping server. Error: %v", err)
panic(err)
Expand All @@ -124,8 +127,8 @@
panic(err)
}

handler := processor.NewServiceCallHandler(ctx, &coreService, createdArtifacts, *clientSet)

Check failure on line 130 in flyteartifacts/pkg/server/server.go

View workflow job for this annotation

GitHub Actions / compile

cannot use &coreService (value of type *CoreService) as artifact.ArtifactRegistryServer value in argument to processor.NewServiceCallHandler: *CoreService does not implement artifact.ArtifactRegistryServer (missing method DeactivateTrigger)
eventsReceiverAndHandler := processor.NewBackgroundProcessor(ctx, *eventsCfg, &coreService, createdArtifacts, scope.NewSubScope("events"))

Check failure on line 131 in flyteartifacts/pkg/server/server.go

View workflow job for this annotation

GitHub Actions / compile

cannot use &coreService (value of type *CoreService) as artifact.ArtifactRegistryServer value in argument to processor.NewBackgroundProcessor: *CoreService does not implement artifact.ArtifactRegistryServer (missing method DeactivateTrigger)
if eventsReceiverAndHandler != nil {
go func() {
logger.Info(ctx, "Starting Artifact service background processing...")
Expand Down
21 changes: 18 additions & 3 deletions flyteartifacts/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/lib"

"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
Expand All @@ -14,16 +15,30 @@
type CoreService struct {
Storage StorageInterface
BlobStore BlobStoreInterface
// SearchHandler SearchHandlerInterface
}

// This string is a tracker basically that will be installed in the metadata of the literal. See the ArtifactKey constant for more information.
func (c *CoreService) getTrackingString(request *artifact.CreateArtifactRequest) string {
ak := request.ArtifactKey
t := fmt.Sprintf("%s/%s/%s@%s", ak.Project, ak.Domain, ak.Name, request.Version)

return t
}

func (c *CoreService) CreateArtifact(ctx context.Context, request *artifact.CreateArtifactRequest) (*artifact.CreateArtifactResponse, error) {

// todo: gatepr _ua tracking bit to be installed
if request == nil {
// todo: move one layer higher to server.go
if request == nil || request.GetArtifactKey() == nil {
logger.Errorf(ctx, "Ignoring nil or partially nil request")
return nil, nil
}

if request.GetSpec().GetValue().Metadata == nil {
request.GetSpec().GetValue().Metadata = make(map[string]string, 1)
}
trackingStr := c.getTrackingString(request)
request.GetSpec().GetValue().Metadata[lib.ArtifactKey] = trackingStr

artifactObj, err := models.CreateArtifactModelFromRequest(ctx, request.ArtifactKey, request.Spec, request.Version, request.Partitions, request.Tag, request.Source)
if err != nil {
logger.Errorf(ctx, "Failed to validate Create request: %v", err)
Expand Down Expand Up @@ -92,9 +107,9 @@
return &artifact.CreateTriggerResponse{}, nil
}

func (c *CoreService) DeleteTrigger(ctx context.Context, request *artifact.DeleteTriggerRequest) (*artifact.DeleteTriggerResponse, error) {

Check failure on line 110 in flyteartifacts/pkg/server/service.go

View workflow job for this annotation

GitHub Actions / compile

undefined: artifact.DeleteTriggerRequest

Check failure on line 110 in flyteartifacts/pkg/server/service.go

View workflow job for this annotation

GitHub Actions / compile

undefined: artifact.DeleteTriggerResponse
// Todo: gatepr - This needs to be implemented before merging.
return &artifact.DeleteTriggerResponse{}, nil

Check failure on line 112 in flyteartifacts/pkg/server/service.go

View workflow job for this annotation

GitHub Actions / compile

undefined: artifact.DeleteTriggerResponse
}

func (c *CoreService) AddTag(ctx context.Context, request *artifact.AddTagRequest) (*artifact.AddTagResponse, error) {
Expand Down
Loading