Skip to content

Commit

Permalink
High blob count optimizations part 1 (to master) (#928)
Browse files Browse the repository at this point in the history
* Performance-optimize simple include-pattern filters, by adding them to the blob search prefix

* Drop to longer intervals between progress updates if there are over a million files
  • Loading branch information
JohnRusk authored Mar 30, 2020
1 parent 7eb7b49 commit 80b0cd3
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 12 deletions.
11 changes: 11 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@

# Change Log

## Version 10.3.61 (private drop)

### Performance optimizations

1. Any `--include-pattern` that contains only `*` wildcards will be performance optimized when querying blob storage. The section before the
first `*` will be used as a server-side prefix, to filter the search results more efficiently. E.g. "--include-path abc*" will be implemented
as a prefix search for "abc". In a more complex example, "--include-path abc\*123", will be implemented as a prefix search for "abc", followed
by client-side filtering to find exact matches to abc\*123.
2. When processing over a million files, AzCopy will report on its progress once ever 2 minutes instead of once every 2 seconds. This reduces the CPU
load associated with progress reporting.

## Version 10.3.4

### New features
Expand Down
5 changes: 4 additions & 1 deletion cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,14 +1092,15 @@ func (cca *cookedCopyCmdArgs) getSuccessExitCode() common.ExitCode {
}
}

func (cca *cookedCopyCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) {
func (cca *cookedCopyCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) (totalKnownCount uint32) {
// fetch a job status
var summary common.ListJobSummaryResponse
Rpc(common.ERpcCmd.ListJobSummary(), &cca.jobID, &summary)
summary.IsCleanupJob = cca.isCleanupJob // only FE knows this, so we can only set it here
cleanupStatusString := fmt.Sprintf("Cleanup %v/%v", summary.TransfersCompleted, summary.TotalTransfers)

jobDone := summary.JobStatus.IsJobDone()
totalKnownCount = summary.TotalTransfers

// if json is not desired, and job is done, then we generate a special end message to conclude the job
duration := time.Now().Sub(cca.jobStartTime) // report the total run time of the job
Expand Down Expand Up @@ -1213,6 +1214,8 @@ Final Job Status: %v%s%s
summary.TransfersSkipped, summary.TotalTransfers, scanningString, perfString, throughputString, diskString)
}
})

return
}

func formatPerfAdvice(advice []common.PerformanceAdvice) string {
Expand Down
7 changes: 7 additions & 0 deletions cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ func (cca *cookedCopyCmdArgs) initModularFilters() []objectFilter {
filters = append(filters, buildAttrFilters(cca.excludeFileAttributes, cca.source, false)...)
}

// finally, log any search prefix computed from these
if ste.JobsAdmin != nil {
if prefixFilter := filterSet(filters).GetEnumerationPreFilter(cca.recursive); prefixFilter != "" {
ste.JobsAdmin.LogToJobLog("Search prefix, which may be used to optimize scanning, is: " + prefixFilter) // "May be used" because we don't know here which enumerators will use it
}
}

return filters
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/jobsResume.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ func (cca *resumeJobController) Cancel(lcm common.LifecycleMgr) {
}

// TODO: can we combine this with the copy one (and the sync one?)
func (cca *resumeJobController) ReportProgressOrExit(lcm common.LifecycleMgr) {
func (cca *resumeJobController) ReportProgressOrExit(lcm common.LifecycleMgr) (totalKnownCount uint32) {
// fetch a job status
var summary common.ListJobSummaryResponse
Rpc(common.ERpcCmd.ListJobSummary(), &cca.jobID, &summary)
jobDone := summary.JobStatus.IsJobDone()
totalKnownCount = summary.TotalTransfers

// if json is not desired, and job is done, then we generate a special end message to conclude the job
duration := time.Now().Sub(cca.jobStartTime) // report the total run time of the job
Expand Down Expand Up @@ -159,6 +160,7 @@ func (cca *resumeJobController) ReportProgressOrExit(lcm common.LifecycleMgr) {
summary.TransfersSkipped, summary.TotalTransfers, scanningString, perfString, throughputString, diskString)
}
})
return
}

func init() {
Expand Down
5 changes: 4 additions & 1 deletion cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (cca *cookedSyncCmdArgs) getJsonOfSyncJobSummary(summary common.ListJobSumm
return string(jsonOutput)
}

func (cca *cookedSyncCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) {
func (cca *cookedSyncCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) (totalKnownCount uint32) {
duration := time.Now().Sub(cca.jobStartTime) // report the total run time of the job
var summary common.ListJobSummaryResponse
var throughput float64
Expand All @@ -390,6 +390,7 @@ func (cca *cookedSyncCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) {
if cca.firstPartOrdered() {
Rpc(common.ERpcCmd.ListJobSummary(), &cca.jobID, &summary)
jobDone = summary.JobStatus.IsJobDone()
totalKnownCount = summary.TotalTransfers

// compute the average throughput for the last time interval
bytesInMb := float64(float64(summary.BytesOverWire-cca.intervalBytesTransferred) * 8 / float64(base10Mega))
Expand Down Expand Up @@ -472,6 +473,8 @@ Final Job Status: %v%s%s
summary.TotalTransfers-summary.TransfersCompleted-summary.TransfersFailed,
summary.TotalTransfers, perfString, ste.ToFixed(throughput, 4), diskString)
})

return
}

func (cca *cookedSyncCmdArgs) process() (err error) {
Expand Down
7 changes: 7 additions & 0 deletions cmd/syncEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"

"github.com/Azure/azure-storage-azcopy/common"
"github.com/Azure/azure-storage-azcopy/ste"
)

// -------------------------------------- Implemented Enumerators -------------------------------------- \\
Expand Down Expand Up @@ -88,6 +89,12 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
excludeAttrFilters := buildAttrFilters(cca.excludeFileAttributes, src, false)
filters = append(filters, excludeAttrFilters...)
}
// after making all filters, log any search prefix computed from them
if ste.JobsAdmin != nil {
if prefixFilter := filterSet(filters).GetEnumerationPreFilter(cca.recursive); prefixFilter != "" {
ste.JobsAdmin.LogToJobLog("Search prefix, which may be used to optimize scanning, is: " + prefixFilter) // "May be used" because we don't know here which enumerators will use it
}
}

// set up the comparator so that the source/destination can be compared
indexer := newObjectIndexer()
Expand Down
4 changes: 4 additions & 0 deletions cmd/zc_enumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ type objectFilter interface {
doesPass(storedObject storedObject) bool
}

type preFilterProvider interface {
getEnumerationPreFilter() string
}

// -------------------------------------- Generic Enumerators -------------------------------------- \\
// the following enumerators must be instantiated with configurations
// they define the work flow in the most generic terms
Expand Down
55 changes: 54 additions & 1 deletion cmd/zc_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (f *includeFilter) doesPass(storedObject storedObject) bool {
matched := false

var err error
matched, err = path.Match(pattern, checkItem)
matched, err = path.Match(pattern, checkItem) // note: getEnumerationPreFilter below encodes assumptions about the valid wildcards used here

// if the pattern failed to match with an error, then we assume the pattern is invalid
// and ignore it
Expand All @@ -146,6 +146,24 @@ func (f *includeFilter) doesPass(storedObject storedObject) bool {
return false
}

// getEnumerationPreFilter returns a prefix, if any, which can be used service-side to pre-select
// things that will pass the filter. E.g. if there's exactly one include pattern, and it is
// "foo*bar", then this routine will return "foo", since only things starting with "foo" can pass the filters.
// Service side enumeration code can be given that prefix, to optimize the enumeration.
func (f *includeFilter) getEnumerationPreFilter() string {
if len(f.patterns) == 1 {
pat := f.patterns[0]
if strings.ContainsAny(pat, "?[\\") {
// this pattern doesn't just use a *, so it's too complex for us to optimize with a prefix
return ""
}
return strings.Split(pat, "*")[0]
} else {
// for simplicity, we won't even try computing a common prefix for all patterns (even though that might help in theory in some cases)
return ""
}
}

func buildIncludeFilters(patterns []string) []objectFilter {
validPatterns := make([]string, 0)
for _, pattern := range patterns {
Expand All @@ -156,3 +174,38 @@ func buildIncludeFilters(patterns []string) []objectFilter {

return []objectFilter{&includeFilter{patterns: validPatterns}}
}

type filterSet []objectFilter

// GetEnumerationPreFilter returns a prefix that is common to all the include filters, or "" if no such prefix can
// be found. (The implementation may return "" even in cases where such a prefix does exist, but in at least the simplest
// cases, it should return a non-empty prefix.)
// The result can be used to optimize enumeration, since anything without this prefix will fail the filterSet
func (fs filterSet) GetEnumerationPreFilter(recursive bool) string {
if recursive {
return ""
// we don't/can't support recursive cases yet, with a strict prefix-based search.
// Because if the filter is, say "a*", then, then a prefix of "a"
// will find: enumerationroot/afoo and enumerationroot/abar
// but it will not find: enumerationroot/virtualdir/afoo
// even though we want --include-pattern to find that.
// So, in recursive cases, we just don't use this prefix-based optimization.
// TODO: consider whether we need to support some way to separately invoke prefix-based optimization
// and filtering. E.g. by a directory-by-directory enumeration (with prefix only within directory),
// using the prefix feature in ListBlobs.
}
prefix := ""
for _, f := range fs {
if participatingFilter, ok := f.(preFilterProvider); ok {
// this filter knows how to participate in our scheme
if prefix == "" {
prefix = participatingFilter.getEnumerationPreFilter()
} else {
// prefix already has a value, which means there must be two participating filters, and we can't handle that.
// Normally this won't happen, because there's only one includeFilter on matter how many include patterns have been supplied.
return ""
}
}
}
return prefix
}
8 changes: 7 additions & 1 deletion cmd/zc_traverser_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,17 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro
searchPrefix += common.AZCOPY_PATH_SEPARATOR_STRING
}

// as a performance optimization, get an extra prefix to do pre-filtering. It's typically the start portion of a blob name.
extraSearchPrefix := filterSet(filters).GetEnumerationPreFilter(t.recursive)

for marker := (azblob.Marker{}); marker.NotDone(); {

// see the TO DO in GetEnumerationPreFilter if/when we make this more directory-aware

// look for all blobs that start with the prefix
// TODO optimize for the case where recursive is off
listBlob, err := containerURL.ListBlobsFlatSegment(t.ctx, marker,
azblob.ListBlobsSegmentOptions{Prefix: searchPrefix, Details: azblob.BlobListingDetails{Metadata: true}})
azblob.ListBlobsSegmentOptions{Prefix: searchPrefix + extraSearchPrefix, Details: azblob.BlobListingDetails{Metadata: true}})
if err != nil {
return fmt.Errorf("cannot list blobs. Failed with error %s", err.Error())
}
Expand Down
38 changes: 32 additions & 6 deletions common/lifecyleMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ func (lcm *lifecycleMgr) processTextOutput(msgToOutput outputMessage) {

// for the lifecycleMgr to babysit a job, it must be given a controller to get information about the job
type WorkController interface {
Cancel(mgr LifecycleMgr) // handle to cancel the work
ReportProgressOrExit(mgr LifecycleMgr) // print the progress status, optionally exit the application if work is done
Cancel(mgr LifecycleMgr) // handle to cancel the work
ReportProgressOrExit(mgr LifecycleMgr) (totalKnownCount uint32) // print the progress status, optionally exit the application if work is done
}

// AllowReinitiateProgressReporting must be called before running an cleanup job, to allow the initiation of that job's
Expand All @@ -441,20 +441,46 @@ func (lcm *lifecycleMgr) InitiateProgressReporting(jc WorkController) {
// this go routine never returns
// it will terminate the whole process eventually when the work is complete
go func() {
const progressFrequencyThreshold = 1000000
var oldCount, newCount uint32

// cancelChannel will be notified when os receives os.Interrupt and os.Kill signals
signal.Notify(lcm.cancelChannel, os.Interrupt, os.Kill)

cancelCalled := false

doCancel := func() {
cancelCalled = true
lcm.Info("Cancellation requested. Beginning clean shutdown...")
jc.Cancel(lcm)
}

for {
select {
case <-lcm.cancelChannel:
lcm.Info("Cancellation requested. Beginning clean shutdown...")
jc.Cancel(lcm)
doCancel()
default:
jc.ReportProgressOrExit(lcm)
newCount = jc.ReportProgressOrExit(lcm)
}

wait := 2 * time.Second
if newCount >= progressFrequencyThreshold && !cancelCalled {
// report less on progress - to save on the CPU costs of doing so and because, if there are this many files,
// its going to be a long job anyway, so no need to report so often
wait = 2 * time.Minute
if oldCount < progressFrequencyThreshold {
lcm.Info(fmt.Sprintf("Reducing progress output frequency to %v, because there are over %d files", wait, progressFrequencyThreshold))
}
}

// wait a bit before fetching job status again, as fetching has costs associated with it on the backend
time.Sleep(2 * time.Second)
select {
case <-lcm.cancelChannel:
doCancel()
case <-time.After(wait):
}

oldCount = newCount
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion common/version.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package common

const AzcopyVersion = "10.3.4"
const AzcopyVersion = "10.3.61" // using the 6x range for this private drop and any related ones
const UserAgent = "AzCopy/" + AzcopyVersion
const S3ImportUserAgent = "S3Import " + UserAgent
const BenchmarkUserAgent = "Benchmark " + UserAgent

0 comments on commit 80b0cd3

Please sign in to comment.