Skip to content

Commit

Permalink
Upload local libraries even if they don't have artifact defined (#1664)
Browse files Browse the repository at this point in the history
## Changes
Previously for all the libraries referenced in configuration DABs made
sure that there is corresponding artifact section.
But this is not really necessary and flexible, because local libraries
might be built outside of dabs context.
It also created difficult to follow logic in code where we back
referenced libraries to artifacts which was difficult to fllow


This PR does 3 things:
1. Allows all local libraries referenced in DABs config to be uploaded
to remote
2. Simplifies upload and glob references expand logic by doing this in
single place
3. Speed things up by uploading library only once and doing this in
parallel

## Tests
Added unit + integration tests + made sure that change is backward
compatible (no changes in existing tests)

---------

Co-authored-by: Pieter Noordhuis <[email protected]>
  • Loading branch information
andrewnester and pietern authored Aug 14, 2024
1 parent 7ae80de commit 48ff18e
Show file tree
Hide file tree
Showing 21 changed files with 1,102 additions and 777 deletions.
191 changes: 0 additions & 191 deletions bundle/artifacts/artifacts.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package artifacts

import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
)

type mutatorFactory = func(name string) bundle.Mutator
Expand All @@ -28,8 +19,6 @@ var buildMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactTy
config.ArtifactPythonWheel: whl.Build,
}

var uploadMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactType]mutatorFactory{}

var prepareMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactType]mutatorFactory{
config.ArtifactPythonWheel: whl.Prepare,
}
Expand All @@ -43,15 +32,6 @@ func getBuildMutator(t config.ArtifactType, name string) bundle.Mutator {
return mutatorFactory(name)
}

func getUploadMutator(t config.ArtifactType, name string) bundle.Mutator {
mutatorFactory, ok := uploadMutators[t]
if !ok {
mutatorFactory = BasicUpload
}

return mutatorFactory(name)
}

func getPrepareMutator(t config.ArtifactType, name string) bundle.Mutator {
mutatorFactory, ok := prepareMutators[t]
if !ok {
Expand Down Expand Up @@ -92,174 +72,3 @@ func (m *basicBuild) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnosti

return nil
}

// Basic Upload defines a general upload mutator which uploads artifact as a library to workspace
type basicUpload struct {
name string
}

func BasicUpload(name string) bundle.Mutator {
return &basicUpload{name: name}
}

func (m *basicUpload) Name() string {
return fmt.Sprintf("artifacts.Upload(%s)", m.name)
}

func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
artifact, ok := b.Config.Artifacts[m.name]
if !ok {
return diag.Errorf("artifact doesn't exist: %s", m.name)
}

if len(artifact.Files) == 0 {
return diag.Errorf("artifact source is not configured: %s", m.name)
}

uploadPath, err := getUploadBasePath(b)
if err != nil {
return diag.FromErr(err)
}

client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath)
if err != nil {
return diag.FromErr(err)
}

err = uploadArtifact(ctx, b, artifact, uploadPath, client)
if err != nil {
return diag.Errorf("upload for %s failed, error: %v", m.name, err)
}

return nil
}

func getFilerForArtifacts(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) {
if isVolumesPath(uploadPath) {
return filer.NewFilesClient(w, uploadPath)
}
return filer.NewWorkspaceFilesClient(w, uploadPath)
}

func isVolumesPath(path string) bool {
return strings.HasPrefix(path, "/Volumes/")
}

func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
for i := range a.Files {
f := &a.Files[i]

filename := filepath.Base(f.Source)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))

err := uploadArtifactFile(ctx, f.Source, client)
if err != nil {
return err
}

log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))
remotePath := f.RemotePath

if !strings.HasPrefix(f.RemotePath, "/Workspace/") && !strings.HasPrefix(f.RemotePath, "/Volumes/") {
wsfsBase := "/Workspace"
remotePath = path.Join(wsfsBase, f.RemotePath)
}

for _, job := range b.Config.Resources.Jobs {
rewriteArtifactPath(b, f, job, remotePath)
}
}

return nil
}

func rewriteArtifactPath(b *bundle.Bundle, f *config.ArtifactFile, job *resources.Job, remotePath string) {
// Rewrite artifact path in job task libraries
for i := range job.Tasks {
task := &job.Tasks[i]
for j := range task.Libraries {
lib := &task.Libraries[j]
if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) {
lib.Whl = remotePath
}
if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) {
lib.Jar = remotePath
}
}

// Rewrite artifact path in job task libraries for ForEachTask
if task.ForEachTask != nil {
forEachTask := task.ForEachTask
for j := range forEachTask.Task.Libraries {
lib := &forEachTask.Task.Libraries[j]
if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) {
lib.Whl = remotePath
}
if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) {
lib.Jar = remotePath
}
}
}
}

// Rewrite artifact path in job environments
for i := range job.Environments {
env := &job.Environments[i]
if env.Spec == nil {
continue
}

for j := range env.Spec.Dependencies {
lib := env.Spec.Dependencies[j]
if isArtifactMatchLibrary(f, lib, b) {
env.Spec.Dependencies[j] = remotePath
}
}
}
}

func isArtifactMatchLibrary(f *config.ArtifactFile, libPath string, b *bundle.Bundle) bool {
if !filepath.IsAbs(libPath) {
libPath = filepath.Join(b.RootPath, libPath)
}

// libPath can be a glob pattern, so do the match first
matches, err := filepath.Glob(libPath)
if err != nil {
return false
}

for _, m := range matches {
if m == f.Source {
return true
}
}

return false
}

// Function to upload artifact file to Workspace
func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error {
raw, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("unable to read %s: %w", file, errors.Unwrap(err))
}

filename := filepath.Base(file)
err = client.Write(ctx, filename, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
return fmt.Errorf("unable to import %s: %w", filename, err)
}

return nil
}

func getUploadBasePath(b *bundle.Bundle) (string, error) {
artifactPath := b.Config.Workspace.ArtifactPath
if artifactPath == "" {
return "", fmt.Errorf("remote artifact path not configured")
}

return path.Join(artifactPath, ".internal"), nil
}
Loading

0 comments on commit 48ff18e

Please sign in to comment.