Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export app image and cache image in parallel #1247

Merged
merged 6 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion acceptance/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,10 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe
h.WithArgs(exportArgs...),
)
h.AssertStringContains(t, output, "Saving "+exportedImageName)

// To detect whether the export of cacheImage and exportedImage is successful
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable parallel export in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a look on how to enable that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be passing "PASS: TestExporter/acceptance-exporter/0.10/registry_case/first_build/cache/cache_image_case/is_created_with_parallel_export_enabled" -> is this this test enough @natalieparellano ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think latest test-windows is failing due to an unrelated error. It should ideally succeed on retry - I don't have option to retry workflow @natalieparellano . I have added test and fixed warning message for -parallel export option. I will wait for end user testing results to see if we need to make more changes.

h.Run(t, exec.Command("docker", "pull", exportedImageName))
assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime)
h.Run(t, exec.Command("docker", "pull", cacheImageName))
})

it("is created with empty layer", func() {
Expand Down
5 changes: 5 additions & 0 deletions cmd/lifecycle/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func FlagPlatformDir(platformDir *string) {
flagSet.StringVar(platformDir, "platform", *platformDir, "path to platform directory")
}

// FlagParallelExport parses `parallel` flag
func FlagParallelExport(parallelExport *bool) {
flagSet.BoolVar(parallelExport, "parallel", *parallelExport, "export app image and cache image in parallel")
}

func FlagPreviousImage(previousImage *string) {
flagSet.StringVar(previousImage, "previous-image", *previousImage, "reference to previous image")
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lifecycle/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (c *createCmd) DefineFlags() {
cli.FlagLauncherPath(&c.LauncherPath)
cli.FlagLayersDir(&c.LayersDir)
cli.FlagOrderPath(&c.OrderPath)
cli.FlagParallelExport(&c.ParallelExport)
cli.FlagPlatformDir(&c.PlatformDir)
cli.FlagPreviousImage(&c.PreviousImageRef)
cli.FlagProcessType(&c.DefaultProcessType)
Expand Down
67 changes: 47 additions & 20 deletions cmd/lifecycle/exporter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/buildpacks/lifecycle/auth"
"github.com/buildpacks/lifecycle/buildpack"
Expand Down Expand Up @@ -72,6 +74,7 @@ func (e *exportCmd) DefineFlags() {
cli.FlagLaunchCacheDir(&e.LaunchCacheDir)
cli.FlagLauncherPath(&e.LauncherPath)
cli.FlagLayersDir(&e.LayersDir)
cli.FlagParallelExport(&e.ParallelExport)
cli.FlagProcessType(&e.DefaultProcessType)
cli.FlagProjectMetadataPath(&e.ProjectMetadataPath)
cli.FlagReportPath(&e.ReportPath)
Expand Down Expand Up @@ -170,13 +173,20 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz
return err
}

g := new(errgroup.Group)
var ctx context.Context

if e.ParallelExport {
g, ctx = errgroup.WithContext(context.Background())
}
exporter := &phase.Exporter{
Buildpacks: group.Group,
LayerFactory: &layers.Factory{
ArtifactsDir: artifactsDir,
UID: e.UID,
GID: e.GID,
Logger: cmd.DefaultLogger,
Ctx: ctx,
},
Logger: cmd.DefaultLogger,
PlatformAPI: e.PlatformAPI,
Expand All @@ -203,31 +213,48 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz
return err
}

report, err := exporter.Export(phase.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
g.Go(func() error {
report, err := exporter.Export(phase.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
}
if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")
}
return nil
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
}
if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")

if !e.ParallelExport {
if err := g.Wait(); err != nil {
return err
}
}

if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
g.Go(func() error {
if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
}
}
return nil
})

if err = g.Wait(); err != nil {
return err
}

return nil
}

Expand Down
92 changes: 59 additions & 33 deletions layers/factory.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package layers

import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"time"

v1 "github.com/google/go-containerregistry/pkg/v1"

Expand All @@ -20,14 +23,15 @@ const (
ProcessTypesLayerName = "Buildpacks Process Types"
SBOMLayerName = "Software Bill-of-Materials"
SliceLayerName = "Application Slice: %d"
processing = "processing"
)

type Factory struct {
ArtifactsDir string // ArtifactsDir is the directory where layer files are written
UID, GID int // UID and GID are used to normalize layer entries
Logger log.Logger

tarHashes map[string]string // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps.
Ctx context.Context
tarHashes sync.Map // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps.
}

type Layer struct {
Expand All @@ -38,44 +42,66 @@ type Layer struct {
}

func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.NormalizingTarWriter) error) (layer Layer, err error) {
if f.Ctx == nil {
f.Ctx = context.TODO()
}
tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar")
if f.tarHashes == nil {
f.tarHashes = make(map[string]string)
for {
sha, loaded := f.tarHashes.LoadOrStore(tarPath, processing)
if loaded {
select {
case <-f.Ctx.Done():
return Layer{}, f.Ctx.Err()
default:
shaString := sha.(string)
if shaString == processing {
// another goroutine is processing this layer, wait and try again
time.Sleep(500 * time.Millisecond)
continue
}

f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, shaString)
return Layer{
ID: id,
TarPath: tarPath,
Digest: shaString,
History: v1.History{CreatedBy: createdBy},
}, nil
}
}
break
}
if sha, ok := f.tarHashes[tarPath]; ok {
f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, sha)

select {
case <-f.Ctx.Done():
return Layer{}, f.Ctx.Err()
default:
lw, err := newFileLayerWriter(tarPath)
if err != nil {
return Layer{}, err
}
defer func() {
if closeErr := lw.Close(); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't err always nil here?

err = closeErr
}
}()
tw := tarWriter(lw)
if err := addEntries(tw); err != nil {
return Layer{}, err
}

if err := tw.Close(); err != nil {
return Layer{}, err
}
digest := lw.Digest()
f.tarHashes.Store(tarPath, digest)
return Layer{
ID: id,
Digest: digest,
TarPath: tarPath,
Digest: sha,
History: v1.History{CreatedBy: createdBy},
}, nil
}
lw, err := newFileLayerWriter(tarPath)
if err != nil {
return Layer{}, err
}
defer func() {
if closeErr := lw.Close(); err == nil {
err = closeErr
}
}()
tw := tarWriter(lw)
if err := addEntries(tw); err != nil {
return Layer{}, err
}

if err := tw.Close(); err != nil {
return Layer{}, err
}, err
}
digest := lw.Digest()
f.tarHashes[tarPath] = digest
return Layer{
ID: id,
Digest: digest,
TarPath: tarPath,
History: v1.History{CreatedBy: createdBy},
}, err
}

func escape(id string) string {
Expand Down
3 changes: 3 additions & 0 deletions platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ const (

// EnvKanikoCacheTTL is the amount of time to persist layers cached by kaniko during the `extend` phase.
EnvKanikoCacheTTL = "CNB_KANIKO_CACHE_TTL"

// EnvParallelExport is a flag used to instruct the lifecycle to export of application image and cache image in parallel, if true.
EnvParallelExport = "CNB_PARALLEL_EXPORT"
)

// DefaultKanikoCacheTTL is the default kaniko cache TTL (2 weeks).
Expand Down
2 changes: 2 additions & 0 deletions platform/lifecycle_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type LifecycleInputs struct {
GID int
ForceRebase bool
SkipLayers bool
ParallelExport bool
UseDaemon bool
UseLayout bool
AdditionalTags str.Slice // str.Slice satisfies the `Value` interface required by the `flag` package
Expand Down Expand Up @@ -131,6 +132,7 @@ func NewLifecycleInputs(platformAPI *api.Version) *LifecycleInputs {
KanikoDir: "/kaniko",
LaunchCacheDir: os.Getenv(EnvLaunchCacheDir),
SkipLayers: skipLayers,
ParallelExport: boolEnv(EnvParallelExport),

// Images used by the lifecycle during the build

Expand Down
11 changes: 11 additions & 0 deletions platform/resolve_create_inputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func testResolveCreateInputs(platformAPI string) func(t *testing.T, when spec.G,
h.SkipIf(t, api.MustParse(platformAPI).LessThan("0.12"), "")
})

when("parallel export is enabled and cache image ref is blank", func() {
it("warns", func() {
inputs.ParallelExport = true
inputs.CacheImageRef = ""
err := platform.ResolveInputs(platform.Create, inputs, logger)
h.AssertNil(t, err)
expected := "parallel export has been enabled, but it has not taken effect because cache image (-cache-image) has not been specified."
h.AssertLogEntry(t, logHandler, expected)
})
})

when("run image", func() {
when("not provided", func() {
it.Before(func() {
Expand Down
10 changes: 10 additions & 0 deletions platform/resolve_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger)
CheckLaunchCache,
ValidateImageRefs,
ValidateTargetsAreSameRegistry,
CheckParallelExport,
)
case Build:
// nop
Expand All @@ -47,6 +48,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger)
CheckLaunchCache,
ValidateImageRefs,
ValidateTargetsAreSameRegistry,
CheckParallelExport,
)
case Detect:
// nop
Expand Down Expand Up @@ -226,6 +228,14 @@ func ValidateRebaseRunImage(i *LifecycleInputs, _ log.Logger) error {
}
}

// CheckParallelExport will validate cache image references when parallel export is enabled.
func CheckParallelExport(i *LifecycleInputs, logger log.Logger) error {
if i.ParallelExport && i.CacheImageRef == "" {
logger.Warn("parallel export has been enabled, but it has not taken effect because cache image (-cache-image) has not been specified.")
}
return nil
}

kritkasahni-google marked this conversation as resolved.
Show resolved Hide resolved
// ValidateTargetsAreSameRegistry ensures all output images are on the same registry.
func ValidateTargetsAreSameRegistry(i *LifecycleInputs, _ log.Logger) error {
if i.UseDaemon {
Expand Down