Skip to content

Commit

Permalink
Artf/lineage and fixes (#4680)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Jan 9, 2024
1 parent 7557d62 commit de919c0
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 77 deletions.
1 change: 1 addition & 0 deletions charts/flyte-sandbox/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ A Helm chart for the Flyte local sandbox
| flyte-binary.configuration.inline.cloudEvents.cloudEventVersion | string | `"v2"` | |
| flyte-binary.configuration.inline.cloudEvents.enable | bool | `true` | |
| flyte-binary.configuration.inline.cloudEvents.type | string | `"sandbox"` | |
| flyte-binary.configuration.inline.flyteadmin.featureGates.enableArtifacts | bool | `true` | |
| flyte-binary.configuration.inline.plugins.k8s.default-env-vars[0].FLYTE_AWS_ENDPOINT | string | `"http://{{ printf \"%s-minio\" .Release.Name | trunc 63 | trimSuffix \"-\" }}.{{ .Release.Namespace }}:9000"` | |
| flyte-binary.configuration.inline.plugins.k8s.default-env-vars[1].FLYTE_AWS_ACCESS_KEY_ID | string | `"minio"` | |
| flyte-binary.configuration.inline.plugins.k8s.default-env-vars[2].FLYTE_AWS_SECRET_ACCESS_KEY | string | `"miniostorage"` | |
Expand Down
3 changes: 3 additions & 0 deletions charts/flyte-sandbox/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ flyte-binary:
enable: true
cloudEventVersion: v2
type: sandbox
flyteadmin:
featureGates:
enableArtifacts: true
artifactsServer:
artifactBlobStoreConfig:
type: stow
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ data:
cloudEventVersion: v2
enable: true
type: sandbox
flyteadmin:
featureGates:
enableArtifacts: true
plugins:
k8s:
default-env-vars:
Expand Down Expand Up @@ -878,7 +881,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: c1BaWUVKZ3RMNHBNa1A5bQ==
haSharedSecret: SzdFS2NpQ1FCMGZpZmdvVA==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1308,7 +1311,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: 45373e78f147407fbee7ae52351e92542475be8fefff767255d1b3bdc48c3a0a
checksum/configuration: a34d42c55d42f2ad83faeb5a31a25315aec8bac20cc32bd57b34d57cf62bc84e
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1474,7 +1477,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 9c81d342cbc7f96cb7d02181f9f7516d4b6e47f6e1475c039241991ac568b851
checksum/secret: 76b53c5ec281a0fe662b1a3cf26e50a247832d71aafa79a49294e1c7e82cb462
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ data:
cloudEventVersion: v2
enable: true
type: sandbox
flyteadmin:
featureGates:
enableArtifacts: true
plugins:
k8s:
default-env-vars:
Expand Down Expand Up @@ -858,7 +861,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: YTNTZDQ0WTFQR1pNb2lJRA==
haSharedSecret: NGtneHhldjRLS0s2VnBFYg==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1256,7 +1259,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: 9502021d87030cfe424e840eee4c26900579d180008686827db9d9ef3f723a5b
checksum/configuration: 9e539e6f6c99d5c7e262b393ccaf10163e49501a72e500108d7abf1fa28efec7
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1422,7 +1425,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 4f7d40beb7c6c4fa355d9a6a5ebeda49896ecf6bb4030db160e2267ef0e4d1f8
checksum/secret: 7ce7b101d2a314d4e180934a2329db83ad951e7d832fd0855b9adecbc52dac57
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: cGlKcHZrZFRRWllURW9Xdw==
haSharedSecret: QlRBU1c2TE9Ma1dGZHdNOQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -982,7 +982,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: cfd24691ea1c055cf1ebdb9f15b6e46916c8831685a84464932920b9b05cb2a8
checksum/secret: 5b126beacb3ed19e3d0c3a580dcbc29332e445220eebd916e688e0c8d7cfdf23
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
8 changes: 0 additions & 8 deletions flyteadmin/pkg/artifacts/config.go

This file was deleted.

6 changes: 2 additions & 4 deletions flyteartifacts/pkg/db/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ func (r *RDSStorage) CreateArtifact(ctx context.Context, serviceModel models.Art
}

func (r *RDSStorage) handleUriGet(ctx context.Context, uri string) (models.Artifact, error) {
artifactID, tag, err := lib.ParseFlyteURL(uri)
artifactID, err := lib.ParseFlyteURL(uri)
if err != nil {
logger.Errorf(ctx, "Failed to parse uri [%s]: %+v", uri, err)
return models.Artifact{}, err
}
if tag != "" {
return models.Artifact{}, fmt.Errorf("tag not implemented yet")
}

logger.Debugf(ctx, "Extracted artifact id [%v] from uri [%s], using id handler", artifactID, uri)
return r.handleArtifactIdGet(ctx, artifactID)
}
Expand Down
5 changes: 4 additions & 1 deletion flyteartifacts/pkg/lib/constants.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package lib

// ArtifactKey - This is used to tag Literals as a tracking bit.
// ArtifactKey - This string is used to identify Artifacts when all you have
// is the underlying Literal. Look for this key under the literal's metadata field. This situation can arise
// when a user fetches an artifact, using something like flyte remote or flyte console, and then kicks
// off an execution using that literal.
const ArtifactKey = "_ua"
46 changes: 20 additions & 26 deletions flyteartifacts/pkg/lib/url_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,44 @@ package lib

import (
"errors"
"net/url"
"regexp"
"strings"

"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"net/url"
"regexp"
)

var flyteURLNameRe = regexp.MustCompile(`(?P<name>[\w/-]+)(?:(:(?P<tag>\w+))?)(?:(@(?P<version>\w+))?)`)
var flyteURLNameRe = regexp.MustCompile(`(?P<project>[\w-]+)/(?P<domain>[\w-]+)/(?P<name>[\w/-]+)(@(?P<version>[\w/-]+))?`)

func ParseFlyteURL(urlStr string) (core.ArtifactID, string, error) {
func ParseFlyteURL(urlStr string) (core.ArtifactID, error) {
if len(urlStr) == 0 {
return core.ArtifactID{}, "", errors.New("URL cannot be empty")
return core.ArtifactID{}, errors.New("URL cannot be empty")
}

parsed, err := url.Parse(urlStr)
if err != nil {
return core.ArtifactID{}, "", err
return core.ArtifactID{}, err
}
queryValues, err := url.ParseQuery(parsed.RawQuery)
if err != nil {
return core.ArtifactID{}, "", err
return core.ArtifactID{}, err
}
projectDomainName := strings.Split(strings.Trim(parsed.Path, "/"), "/")
if len(projectDomainName) < 3 {
return core.ArtifactID{}, "", errors.New("invalid URL format")
}
project, domain, name := projectDomainName[0], projectDomainName[1], strings.Join(projectDomainName[2:], "/")
version := ""
tag := ""

var project, domain, name, version string
queryDict := make(map[string]string)

if match := flyteURLNameRe.FindStringSubmatch(name); match != nil {
name = match[1]
if match[3] != "" {
tag = match[3]
if match := flyteURLNameRe.FindStringSubmatch(parsed.Path); match != nil {
if len(match) < 4 {
return core.ArtifactID{}, fmt.Errorf("insufficient components specified %s", parsed.Path)
}
if match[5] != "" {
project = match[1]
domain = match[2]
name = match[3]
if len(match) > 5 {
version = match[5]
}

if tag != "" && (version != "" || len(queryValues) > 0) {
return core.ArtifactID{}, "", errors.New("cannot specify tag with version or querydict")
}
} else {
return core.ArtifactID{}, fmt.Errorf("unable to parse %s", parsed.Path)
}

for key, values := range queryValues {
Expand All @@ -68,5 +62,5 @@ func ParseFlyteURL(urlStr string) (core.ArtifactID, string, error) {
Partitions: p,
}

return a, tag, nil
return a, nil
}
36 changes: 13 additions & 23 deletions flyteartifacts/pkg/lib/url_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,42 @@ import (
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
)

func TestURLParseWithTag(t *testing.T) {
artifactID, tag, err := ParseFlyteURL("flyte://av0.1/project/domain/name:tag")
func TestURLParseWithVersionAndPartitions(t *testing.T) {
artifactID, err := ParseFlyteURL("flyte://av0.1/project/domain/name@version?foo=bar&ham=spam")
expPartitions := map[string]string{"foo": "bar", "ham": "spam"}
assert.NoError(t, err)

assert.Equal(t, "project", artifactID.ArtifactKey.Project)
assert.Equal(t, "domain", artifactID.ArtifactKey.Domain)
assert.Equal(t, "name", artifactID.ArtifactKey.Name)
assert.Equal(t, "", artifactID.Version)
assert.Equal(t, "tag", tag)
assert.Nil(t, artifactID.GetPartitions())
assert.Equal(t, "version", artifactID.Version)
p := artifactID.GetPartitions()
mapP := models.PartitionsFromIdl(context.TODO(), p)
assert.Equal(t, expPartitions, mapP)
}

func TestURLParseWithVersionAndPartitions(t *testing.T) {
artifactID, tag, err := ParseFlyteURL("flyte://av0.1/project/domain/name@version?foo=bar&ham=spam")
func TestURLParseWithSlashVersionAndPartitions(t *testing.T) {
artifactID, err := ParseFlyteURL("flyte://av0.1/project/domain/name/more@version/abc/0/o0?foo=bar&ham=spam")
expPartitions := map[string]string{"foo": "bar", "ham": "spam"}
assert.NoError(t, err)

assert.Equal(t, "project", artifactID.ArtifactKey.Project)
assert.Equal(t, "domain", artifactID.ArtifactKey.Domain)
assert.Equal(t, "name", artifactID.ArtifactKey.Name)
assert.Equal(t, "version", artifactID.Version)
assert.Equal(t, "", tag)
assert.Equal(t, "name/more", artifactID.ArtifactKey.Name)
assert.Equal(t, "version/abc/0/o0", artifactID.Version)
p := artifactID.GetPartitions()
mapP := models.PartitionsFromIdl(context.TODO(), p)
assert.Equal(t, expPartitions, mapP)
}

func TestURLParseFailsWithBothTagAndPartitions(t *testing.T) {
_, _, err := ParseFlyteURL("flyte://av0.1/project/domain/name:tag?foo=bar&ham=spam")
assert.Error(t, err)
}

func TestURLParseWithBothTagAndVersion(t *testing.T) {
_, _, err := ParseFlyteURL("flyte://av0.1/project/domain/name:tag@version")
assert.Error(t, err)
}

func TestURLParseNameWithSlashes(t *testing.T) {
artifactID, tag, err := ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes")
artifactID, err := ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes")
assert.NoError(t, err)
assert.Equal(t, "project", artifactID.ArtifactKey.Project)
assert.Equal(t, "domain", artifactID.ArtifactKey.Domain)
assert.Equal(t, "name/with/slashes", artifactID.ArtifactKey.Name)
assert.Equal(t, "", tag)

artifactID, _, err = ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes?ds=2020-01-01")
artifactID, err = ParseFlyteURL("flyte://av0.1/project/domain/name/with/slashes?ds=2020-01-01")
assert.NoError(t, err)
assert.Equal(t, "name/with/slashes", artifactID.ArtifactKey.Name)
assert.Equal(t, "project", artifactID.ArtifactKey.Project)
Expand Down
20 changes: 16 additions & 4 deletions flyteartifacts/pkg/server/processor/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,25 @@ func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2

func (s *ServiceCallHandler) HandleEventExecStart(ctx context.Context, evt *event.CloudEventExecutionStart) error {

if len(evt.ArtifactIds) > 0 {
var inputsUsed []*core.ArtifactID

inputsUsed = append(inputsUsed, evt.ArtifactIds...)
for _, x := range evt.ArtifactTrackers {

dummyURI := fmt.Sprintf("flyte://av0.1/%s", x)
idWithVersion, err := lib.ParseFlyteURL(dummyURI)
if err != nil {
logger.Errorf(ctx, "Error parsing input %s for execution start: %v", x, err)
return err
}
inputsUsed = append(inputsUsed, &idWithVersion)
}

if len(inputsUsed) > 0 {
// metric
req := &artifact.ExecutionInputsRequest{
ExecutionId: evt.ExecutionId,
Inputs: evt.ArtifactIds,
Inputs: inputsUsed,
}
_, err := s.service.SetExecutionInputs(ctx, req)
if err != nil {
Expand Down Expand Up @@ -172,8 +186,6 @@ func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variabl
}

var partitions map[string]string
// todo: consider updating idl to make CreateArtifactRequest just take a full Partitions
// object rather than a mapstrstr @eapolinario @enghabu
if partialID.GetPartitions().GetValue() != nil && len(partialID.GetPartitions().GetValue()) > 0 {
partitions = make(map[string]string, len(partialID.GetPartitions().GetValue()))
for k, lv := range partialID.GetPartitions().GetValue() {
Expand Down
3 changes: 3 additions & 0 deletions flyteartifacts/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type ArtifactService struct {
}

func (a *ArtifactService) CreateArtifact(ctx context.Context, req *artifact.CreateArtifactRequest) (*artifact.CreateArtifactResponse, error) {

// todo: add a request validating section, check for nils, etc.

resp, err := a.Service.CreateArtifact(ctx, req)
if err != nil {
return resp, err
Expand Down
21 changes: 18 additions & 3 deletions flyteartifacts/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/lib"

"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
Expand All @@ -14,16 +15,30 @@ import (
type CoreService struct {
Storage StorageInterface
BlobStore BlobStoreInterface
// SearchHandler SearchHandlerInterface
}

// This string is a tracker basically that will be installed in the metadata of the literal. See the ArtifactKey constant for more information.
func (c *CoreService) getTrackingString(request *artifact.CreateArtifactRequest) string {
ak := request.ArtifactKey
t := fmt.Sprintf("%s/%s/%s@%s", ak.Project, ak.Domain, ak.Name, request.Version)

return t
}

func (c *CoreService) CreateArtifact(ctx context.Context, request *artifact.CreateArtifactRequest) (*artifact.CreateArtifactResponse, error) {

// todo: gatepr _ua tracking bit to be installed
if request == nil {
// todo: move one layer higher to server.go
if request == nil || request.GetArtifactKey() == nil {
logger.Errorf(ctx, "Ignoring nil or partially nil request")
return nil, nil
}

if request.GetSpec().GetValue().Metadata == nil {
request.GetSpec().GetValue().Metadata = make(map[string]string, 1)
}
trackingStr := c.getTrackingString(request)
request.GetSpec().GetValue().Metadata[lib.ArtifactKey] = trackingStr

artifactObj, err := models.CreateArtifactModelFromRequest(ctx, request.ArtifactKey, request.Spec, request.Version, request.Partitions, request.Tag, request.Source)
if err != nil {
logger.Errorf(ctx, "Failed to validate Create request: %v", err)
Expand Down

0 comments on commit de919c0

Please sign in to comment.