Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor reindex and add s3 tags for pushing, skip reindex for push … #460

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions cmd/helm-s3/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const pushDesc = `This command uploads a chart to the repository.
'helm s3 push' takes two arguments:
- PATH - path to the chart file,
- REPO - target repository.
- TAGS - S3 object tags.
`

const pushExample = ` helm s3 push ./epicservice-0.5.1.tgz my-repo - uploads chart file 'epicservice-0.5.1.tgz' from the current directory to the repository with name 'my-repo'.`
Expand All @@ -38,6 +39,8 @@ func newPushCommand(opts *options) *cobra.Command {
force: false,
ignoreIfExists: false,
relative: false,
skipReindex: false,
tags: "",
}

cmd := &cobra.Command{
Expand Down Expand Up @@ -69,6 +72,8 @@ func newPushCommand(opts *options) *cobra.Command {
flags.BoolVar(&act.force, "force", act.force, "Replace the chart if it already exists. This can cause the repository to lose existing chart; use it with care.")
flags.BoolVar(&act.ignoreIfExists, "ignore-if-exists", act.ignoreIfExists, "If the chart already exists, exit normally and do not trigger an error.")
flags.BoolVar(&act.relative, "relative", act.relative, "Use relative chart URL in the index instead of absolute.")
flags.BoolVar(&act.skipReindex, "skip-reindex", act.skipReindex, "Skip reindex after pushing the chart.")
flags.StringVar(&act.tags, "tags", act.tags, "S3 object tags.")

// We don't use cobra's feature
//
Expand Down Expand Up @@ -99,6 +104,8 @@ type pushAction struct {
force bool
ignoreIfExists bool
relative bool
skipReindex bool
tags string
}

func (act *pushAction) run(ctx context.Context) error {
Expand Down Expand Up @@ -186,7 +193,7 @@ func (act *pushAction) run(ctx context.Context) error {
if err != nil {
return err
}
if _, err := storage.PutChart(ctx, repoEntry.URL()+"/"+fname, fchart, string(chartMetaJSON), act.acl, hash, act.contentType); err != nil {
if _, err := storage.PutChart(ctx, repoEntry.URL()+"/"+fname, fchart, string(chartMetaJSON), act.acl, hash, act.contentType, act.tags); err != nil {
return errors.WithMessage(err, "upload chart to s3")
}
}
Expand All @@ -201,40 +208,42 @@ func (act *pushAction) run(ctx context.Context) error {
if err != nil {
return errors.WithMessage(err, "fetch current repo index")
}
if !act.skipReindex {
idx := helmutil.NewIndex()
if err := idx.UnmarshalBinary(b); err != nil {
return errors.WithMessage(err, "load index from downloaded file")
}

idx := helmutil.NewIndex()
if err := idx.UnmarshalBinary(b); err != nil {
return errors.WithMessage(err, "load index from downloaded file")
}

baseURL := repoEntry.URL()
if act.relative {
baseURL = ""
}

filename := escapeIfRelative(fname, act.relative)
baseURL := repoEntry.URL()
if act.relative {
baseURL = ""
}

if err := idx.AddOrReplace(chart.Metadata().Value(), filename, baseURL, hash); err != nil {
return errors.WithMessage(err, "add/replace chart in the index")
}
idx.SortEntries()
idx.UpdateGeneratedTime()
filename := escapeIfRelative(fname, act.relative)

idxReader, err := idx.Reader()
if err != nil {
return errors.WithMessage(err, "get index reader")
}
if err := idx.AddOrReplace(chart.Metadata().Value(), filename, baseURL, hash); err != nil {
return errors.WithMessage(err, "add/replace chart in the index")
}
idx.SortEntries()
idx.UpdateGeneratedTime()

if !act.dryRun {
if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, idxReader); err != nil {
return errors.WithMessage(err, "upload index to s3")
idxReader, err := idx.Reader()
if err != nil {
return errors.WithMessage(err, "get index reader")
}

if err := idx.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil {
return errors.WithMessage(err, "update local index")
if !act.dryRun {
if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, idxReader); err != nil {
return errors.WithMessage(err, "upload index to s3")
}

if err := idx.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil {
return errors.WithMessage(err, "update local index")
}
}
} else {
act.printer.Printf("[DEBUG] Skipping reindex cause skipReindex is set to %b.\n", act.skipReindex)
}

act.printer.Printf("Successfully uploaded the chart to the repository.\n")
return nil
}
Expand Down
122 changes: 92 additions & 30 deletions cmd/helm-s3/reindex.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package main

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"sync"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"

"github.com/hypnoglow/helm-s3/internal/awss3"
"github.com/hypnoglow/helm-s3/internal/awsutil"
"github.com/hypnoglow/helm-s3/internal/helmutil"
log "github.com/sirupsen/logrus"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/repo"
)

const reindexDesc = `This command performs a reindex of the repository.
Expand All @@ -19,6 +27,7 @@ const reindexDesc = `This command performs a reindex of the repository.
`

const reindexExample = ` helm s3 reindex my-repo - performs a reindex of the repository with name 'my-repo'.`
const batchSize = 1000

func newReindexCommand(opts *options) *cobra.Command {
act := &reindexAction{
Expand All @@ -27,6 +36,7 @@ func newReindexCommand(opts *options) *cobra.Command {
verbose: false,
repoName: "",
relative: false,
dryRun: false,
}

cmd := &cobra.Command{
Expand All @@ -50,6 +60,7 @@ func newReindexCommand(opts *options) *cobra.Command {

flags := cmd.Flags()
flags.BoolVar(&act.relative, "relative", act.relative, "Use relative chart URLs in the index instead of absolute.")
flags.BoolVar(&act.dryRun, "dry-run", act.dryRun, "Simulate reindex, don't push it to the dest repo.")

return cmd
}
Expand All @@ -69,9 +80,15 @@ type reindexAction struct {
// flags

relative bool

dryRun bool
}

func (act *reindexAction) run(ctx context.Context) error {
start := time.Now()
log.Infof("Starting reindex for %s", start)
act.printer.Printf("[DEBUG] Starting reindex.\n")

repoEntry, err := helmutil.LookupRepoEntry(act.repoName)
if err != nil {
return err
Expand All @@ -83,52 +100,97 @@ func (act *reindexAction) run(ctx context.Context) error {
}
storage := awss3.New(sess)

items, errs := storage.Traverse(ctx, repoEntry.URL())
items, _ := storage.Traverse(ctx, repoEntry.URL())

builtIndex := make(chan helmutil.Index, 1)
go func() {
idx := helmutil.NewIndex()
for item := range items {
baseURL := repoEntry.URL()
if act.relative {
baseURL = ""
}
// Use a buffered channel to handle the concurrent indexing
builtIndex := make(chan *repo.IndexFile, len(items)/batchSize+1)
var wg sync.WaitGroup

if act.verbose {
act.printer.Printf("[DEBUG] Adding %s to index.\n", item.Filename)
}
act.printer.Printf("[DEBUG] Creating split indexes.\n")

// Process items in batches of 1000
log.Info("Processing items in batches of 1000")
log.Info("Total items: ", len(items))
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}

filename := escapeIfRelative(item.Filename, act.relative)
wg.Add(1)
go func(batch []awss3.ChartInfo) {
defer wg.Done()
idx := repo.NewIndexFile()

if err := idx.Add(item.Meta.Value(), filename, baseURL, item.Hash); err != nil {
act.printer.PrintErrf("[ERROR] failed to add chart to the index: %s", err)
for _, item := range batch {
baseURL := repoEntry.URL()
if act.relative {
baseURL = ""
}

if act.verbose {
act.printer.Printf("[DEBUG] Adding %s to index.\n", item.Filename)
}

filename := escapeIfRelative(item.Filename, act.relative)

if err := idx.MustAdd(item.Meta.Value().(*chart.Metadata), filename, baseURL, item.Hash); err != nil {
act.printer.PrintErrf("[ERROR] failed to add chart to the index: %s", err)
}
}
}
idx.SortEntries()
idx.UpdateGeneratedTime()

builtIndex <- idx
}()
builtIndex <- idx
}(items[i:end])
}

for err = range errs {
return fmt.Errorf("traverse the chart repository: %v", err)
log.Info("Waiting for all goroutines to finish")
// Wait for all goroutines to finish
wg.Wait()
close(builtIndex)

log.Info("Processing indexes")
// Merge the individual index files into a single index file
finalIndex := repo.NewIndexFile()
for idx := range builtIndex {
finalIndex.Merge(idx)
}

finalIndex.SortEntries()

if err := finalIndex.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil {
return errors.WithMessage(err, "update local index")
}
log.Infof("Index file written to %s", repoEntry.CacheFile())

idx := <-builtIndex
file, err := os.Open(repoEntry.CacheFile())
if err != nil {
return errors.Wrap(err, "open index file")
}
defer file.Close()

r, err := idx.Reader()
// Get the file size
stat, err := file.Stat()
if err != nil {
return errors.Wrap(err, "get index reader")
return errors.Wrap(err, "get file stats")
}

if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, r); err != nil {
return errors.Wrap(err, "upload index to the repository")
// Read the file into a byte slice
ra := make([]byte, stat.Size())
if _, err := bufio.NewReader(file).Read(ra); err != nil && err != io.EOF {
return errors.Wrap(err, "read index file")
}

if err := idx.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil {
return errors.WithMessage(err, "update local index")
r := bytes.NewReader(ra)

if !act.dryRun {
if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, r); err != nil {
return errors.Wrap(err, "upload index to the repository")
}
} else {
act.printer.Printf("[DEBUG] Dry run, not pushing index to the repository.\n")
}

act.printer.Printf("Repository %s was successfully reindexed.\n", act.repoName)
log.Infof("Reindex done in %s", time.Since(start))
return nil
}
Loading