Skip to content

Commit

Permalink
Export cache image and app image in parallel
Browse files Browse the repository at this point in the history
cc - credits Woa [email protected] for initiating this work

Signed-off-by: Kritka Sahni <[email protected]>
  • Loading branch information
kritkasahni-google committed Nov 14, 2023
1 parent dbd27bd commit 9fdf1d7
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 49 deletions.
2 changes: 2 additions & 0 deletions acceptance/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,10 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe
)
h.AssertStringContains(t, output, "Saving "+exportedImageName)

// To detect whether the export of cacheImage and exportedImage is successful
h.Run(t, exec.Command("docker", "pull", exportedImageName))
assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime)
h.Run(t, exec.Command("docker", "pull", cacheImageName))
})

it("is created with empty layer", func() {
Expand Down
5 changes: 5 additions & 0 deletions cmd/lifecycle/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func FlagPlanPath(planPath *string) {
flagSet.StringVar(planPath, "plan", *planPath, "path to plan.toml")
}

// FlagParallelExport parses `parallel` flag
func FlagParallelExport(parallelExport *bool) {
flagSet.BoolVar(parallelExport, "parallel", *parallelExport, "export app image and cache image in parallel")
}

func FlagPlatformDir(platformDir *string) {
flagSet.StringVar(platformDir, "platform", *platformDir, "path to platform directory")
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lifecycle/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (c *createCmd) DefineFlags() {
cli.FlagLauncherPath(&c.LauncherPath)
cli.FlagLayersDir(&c.LayersDir)
cli.FlagOrderPath(&c.OrderPath)
cli.FlagParallelExport(&c.ParallelExport)
cli.FlagPlatformDir(&c.PlatformDir)
cli.FlagPreviousImage(&c.PreviousImageRef)
cli.FlagProcessType(&c.DefaultProcessType)
Expand Down
64 changes: 44 additions & 20 deletions cmd/lifecycle/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (e *exportCmd) DefineFlags() {
cli.FlagLaunchCacheDir(&e.LaunchCacheDir)
cli.FlagLauncherPath(&e.LauncherPath)
cli.FlagLayersDir(&e.LayersDir)
cli.FlagParallelExport(&e.ParallelExport)
cli.FlagProcessType(&e.DefaultProcessType)
cli.FlagProjectMetadataPath(&e.ProjectMetadataPath)
cli.FlagReportPath(&e.ReportPath)
Expand Down Expand Up @@ -170,13 +171,19 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz
return err
}

var g *errgroup.Group
var ctx context.Context
if e.Parallel {
g, ctx = errgroup.WithContext(context.Background())
}
exporter := &phase.Exporter{
Buildpacks: group.Group,
LayerFactory: &layers.Factory{
ArtifactsDir: artifactsDir,
UID: e.UID,
GID: e.GID,
Logger: cmd.DefaultLogger,
Ctx: ctx,
},
Logger: cmd.DefaultLogger,
PlatformAPI: e.PlatformAPI,
Expand All @@ -203,31 +210,48 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz
return err
}

report, err := exporter.Export(phase.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
g.Go(func() error {
report, err := exporter.Export(phase.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
}
if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")
}
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
}
if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")

if !e.parallel {
ctx = nil
if err := g.Wait(); err != nil {
return err
}
}

if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
g.Go(func() error {
if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
}
}
return nil
})

if err := g.Wait(); err != nil {
return err
}

return nil
}

Expand Down
87 changes: 58 additions & 29 deletions layers/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package layers

import (
"context"
"os"
"path/filepath"
"strings"
Expand All @@ -9,6 +10,7 @@ import (

"github.com/buildpacks/lifecycle/archive"
"github.com/buildpacks/lifecycle/log"
"sync"
)

const (
Expand All @@ -26,8 +28,10 @@ type Factory struct {
ArtifactsDir string // ArtifactsDir is the directory where layer files are written
UID, GID int // UID and GID are used to normalize layer entries
Logger log.Logger

tarHashes map[string]string // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps.

// tarHashes stores hashes of layer tarballs for reuse between the export and cache steps.
tarHashes sync.Map
ctx context.Context
}

type Layer struct {
Expand All @@ -38,38 +42,63 @@ type Layer struct {
}

func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.NormalizingTarWriter) error) (layer Layer, err error) {
tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar")
if f.tarHashes == nil {
f.tarHashes = make(map[string]string)
if f.ctx == nil {
f.ctx = context.TODO()
}
if sha, ok := f.tarHashes[tarPath]; ok {
f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, sha)
return Layer{
ID: id,
TarPath: tarPath,
Digest: sha,
History: v1.History{CreatedBy: createdBy},
}, nil
}
lw, err := newFileLayerWriter(tarPath)
if err != nil {
return Layer{}, err
}
defer func() {
if closeErr := lw.Close(); err == nil {
err = closeErr

tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar")
const processing = "processing"
for {
sha, loaded := f.tarHashes.LoadOrStore(tarPath, processing)
if loaded {
select {
case <-ctx.Done():
return nil, fmt.Errorf("layer factory context canceled")
default:
shaString := sha.(string)
if shaString == processing {
// another goroutine is processing this layer, wait and try again
time.Sleep(time.Duration(500 * time.Millisecond))
continue
}

f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, shaString)
return Layer{
ID: id,
TarPath: tarPath,
Digest: shaString,
History: v1.History{CreatedBy: createdBy},
}, nil
}
}
}()
tw := tarWriter(lw)
if err := addEntries(tw); err != nil {
return Layer{}, err
break
}

if err := tw.Close(); err != nil {
return Layer{}, err

select {
case <- ctx.Done():
return fmt.Errorf("layer factory context canceled")
default:
lw, err := newFileLayerWriter(tarPath)
if err != nil {
return Layer{}, err
}
defer func() {
if closeErr := lw.Close(); err == nil {
err = closeErr
}
}()
tw := tarWriter(lw)
if err := addEntries(tw); err != nil {
return Layer{}, err
}

if err := tw.Close(); err != nil {
return Layer{}, err
}
digest := lw.Digest()
f.tarHashes[tarPath] = digest
}
digest := lw.Digest()
f.tarHashes[tarPath] = digest
return Layer{
ID: id,
Digest: digest,
Expand Down
3 changes: 3 additions & 0 deletions platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ const (

// EnvKanikoCacheTTL is the amount of time to persist layers cached by kaniko during the `extend` phase.
EnvKanikoCacheTTL = "CNB_KANIKO_CACHE_TTL"

// EnvParallelExport is a flag used to instruct the lifecycle to export of application image and cache image in parallel, if true.
EnvParallelExport = "CNB_PARALLEL_EXPORT"
)

// DefaultKanikoCacheTTL is the default kaniko cache TTL (2 weeks).
Expand Down
2 changes: 2 additions & 0 deletions platform/lifecycle_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type LifecycleInputs struct {
GID int
ForceRebase bool
SkipLayers bool
ParallelExport bool
UseDaemon bool
UseLayout bool
AdditionalTags str.Slice // str.Slice satisfies the `Value` interface required by the `flag` package
Expand Down Expand Up @@ -131,6 +132,7 @@ func NewLifecycleInputs(platformAPI *api.Version) *LifecycleInputs {
KanikoDir: "/kaniko",
LaunchCacheDir: os.Getenv(EnvLaunchCacheDir),
SkipLayers: skipLayers,
ParallelExport: boolEnv(EnvParallelExport),

// Images used by the lifecycle during the build

Expand Down
11 changes: 11 additions & 0 deletions platform/resolve_create_inputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ func testResolveCreateInputs(platformAPI string) func(t *testing.T, when spec.G,
it.Before(func() {
h.SkipIf(t, api.MustParse(platformAPI).LessThan("0.12"), "")
})

when("parallel export is enabled and cache image ref is blank", func() {
it("warns", func() {
inputs.ParallelExport = true
inputs.CacheImageRef = ""
err := platform.ResolveInputs(platform.Create, inputs, logger)
h.AssertNil(t, err)
expected := "parallel export has been enabled, but it has not taken effect because cache image (-cache-image) has not been specified."
h.AssertLogEntry(t, logHandler, expected)
})
})

when("run image", func() {
when("not provided", func() {
Expand Down
10 changes: 10 additions & 0 deletions platform/resolve_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger)
CheckLaunchCache,
ValidateImageRefs,
ValidateTargetsAreSameRegistry,
CheckParallelExport,
)
case Detect:
// nop
Expand All @@ -58,6 +59,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger)
CheckLaunchCache,
ValidateImageRefs,
ValidateTargetsAreSameRegistry,
CheckParallelExport,
)
case Extend:
// nop
Expand Down Expand Up @@ -226,6 +228,14 @@ func ValidateRebaseRunImage(i *LifecycleInputs, _ log.Logger) error {
}
}

// CheckParallelExport will validate cache image references when parallel export is enabled.
func CheckParallelExport(i *LifecycleInputs, logger log.Logger) error {
if i.ParallelExport && i.CacheImageRef == "" {
logger.Warn("parallel export has been enabled, but it has not taken effect because cache image (-cache-image) has not been specified.")
}
return nil
}

// ValidateTargetsAreSameRegistry ensures all output images are on the same registry.
func ValidateTargetsAreSameRegistry(i *LifecycleInputs, _ log.Logger) error {
if i.UseDaemon {
Expand Down

0 comments on commit 9fdf1d7

Please sign in to comment.