Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run more things in parallel #3636

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ require (
github.com/OneOfOne/xxhash v1.2.8
github.com/adrg/xdg v0.5.3
github.com/anchore/archiver/v3 v3.5.3-0.20241210171143-5b1d8d1c7c51
github.com/anchore/go-sync v0.0.0-20250215193218-907b61969935
github.com/hashicorp/hcl/v2 v2.23.0
github.com/magiconair/properties v1.8.9
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj
github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA=
github.com/anchore/go-sync v0.0.0-20250215193218-907b61969935 h1:v7IukOdZdCTsSV2aCxRq6Y6gaaALz8IvcmJzS35mdmw=
github.com/anchore/go-sync v0.0.0-20250215193218-907b61969935/go.mod h1:IUw+ZYPpxADtssCML2cyxaEV0RzfK6PajiBVQyGSyG4=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ=
github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods=
Expand Down
7 changes: 5 additions & 2 deletions internal/file/digest.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package file

import (
"context"
"crypto"
"fmt"
"hash"
"io"
"strings"

"github.com/anchore/go-sync"
"github.com/anchore/syft/syft/file"
)

Expand All @@ -21,7 +23,7 @@ func supportedHashAlgorithms() []crypto.Hash {
}
}

func NewDigestsFromFile(closer io.ReadCloser, hashes []crypto.Hash) ([]file.Digest, error) {
func NewDigestsFromFile(ctx context.Context, closer io.ReadCloser, hashes []crypto.Hash) ([]file.Digest, error) {
hashes = NormalizeHashes(hashes)
// create a set of hasher objects tied together with a single writer to feed content into
hashers := make([]hash.Hash, len(hashes))
Expand All @@ -31,7 +33,8 @@ func NewDigestsFromFile(closer io.ReadCloser, hashes []crypto.Hash) ([]file.Dige
writers[idx] = hashers[idx]
}

size, err := io.Copy(io.MultiWriter(writers...), closer)
size, err := io.Copy(sync.ParallelWriter(sync.GetExecutor(ctx, "cpu"), writers...), closer)
// size, err := io.Copy(io.MultiWriter(writers...), closer)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/file/digest_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package file

import (
"context"
"crypto"
"os"
"testing"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestNewDigestsFromFile(t *testing.T) {
fh, err := os.Open(tt.fixture)
require.NoError(t, err)

got, err := NewDigestsFromFile(fh, tt.hashes)
got, err := NewDigestsFromFile(context.TODO(), fh, tt.hashes)
tt.wantErr(t, err)
if err != nil {
return
Expand Down
30 changes: 18 additions & 12 deletions internal/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,11 @@ func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsy
return
}

err := runTaskSafely(ctx, tsk, resolver, s)
unknowns, remainingErrors := unknown.ExtractCoordinateErrors(err)
if len(unknowns) > 0 {
appendUnknowns(s, tsk.Name(), unknowns)
}
if remainingErrors != nil {
withLock(func() {
errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", remainingErrors))
prog.SetError(remainingErrors)
})
}
prog.Increment()
err := RunTask(ctx, tsk, resolver, s, prog)
withLock(func() {
err = multierror.Append(err, fmt.Errorf("failed to run task: %w", err))
errs = multierror.Append(errs, err)
})
}
}()
}
Expand All @@ -78,6 +71,19 @@ func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsy
return errs
}

func RunTask(ctx context.Context, tsk Task, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error {
err := runTaskSafely(ctx, tsk, resolver, s)
unknowns, remainingErrors := unknown.ExtractCoordinateErrors(err)
if len(unknowns) > 0 {
appendUnknowns(s, tsk.Name(), unknowns)
}
if remainingErrors != nil {
prog.SetError(remainingErrors)
}
prog.Increment()
return remainingErrors
}

func appendUnknowns(builder sbomsync.Builder, taskName string, unknowns []unknown.CoordinateError) {
if accessor, ok := builder.(sbomsync.Accessor); ok {
accessor.WriteToSBOM(func(sb *sbom.SBOM) {
Expand Down
4 changes: 2 additions & 2 deletions internal/task/file_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ func newExecutableCatalogerTaskFactory(tags ...string) factory {
}

func newExecutableCatalogerTask(selection file.Selection, cfg executable.Config, tags ...string) Task {
fn := func(_ context.Context, resolver file.Resolver, builder sbomsync.Builder) error {
fn := func(ctx context.Context, resolver file.Resolver, builder sbomsync.Builder) error {
if selection == file.NoFilesSelection {
return nil
}

accessor := builder.(sbomsync.Accessor)

result, err := executable.NewCataloger(cfg).Catalog(resolver)
result, err := executable.NewCataloger(cfg).CatalogCtx(ctx, resolver)

accessor.WriteToSBOM(func(sbom *sbom.SBOM) {
sbom.Artifacts.Executables = result
Expand Down
59 changes: 51 additions & 8 deletions syft/create_sbom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package syft
import (
"context"
"fmt"
"runtime"
"sort"

"github.com/dustin/go-humanize"
"github.com/scylladb/go-set/strset"

"github.com/anchore/go-sync"
"github.com/anchore/syft/internal/bus"
"github.com/anchore/syft/internal/licenses"
"github.com/anchore/syft/internal/sbomsync"
Expand Down Expand Up @@ -62,22 +64,20 @@ func CreateSBOM(ctx context.Context, src source.Source, cfg *CreateSBOMConfig) (
},
}

// inject a single license scanner and content config for all package cataloging tasks into context
licenseScanner, err := licenses.NewDefaultScanner(
licenses.WithIncludeLicenseContent(cfg.Licenses.IncludeUnkownLicenseContent),
licenses.WithCoverage(cfg.Licenses.Coverage),
)
// setup everything we need in context: license scanner, executors, etc.
ctx, err = setupContext(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("could not build licenseScanner for cataloging: %w", err)
return nil, err
}
ctx = licenses.SetContextLicenseScanner(ctx, licenseScanner)

catalogingProgress := monitorCatalogingTask(src.ID(), taskGroups)
packageCatalogingProgress := monitorPackageCatalogingTask()

builder := sbomsync.NewBuilder(&s, monitorPackageCount(packageCatalogingProgress))
for i := range taskGroups {
err := task.NewTaskExecutor(taskGroups[i], cfg.Parallelism).Execute(ctx, resolver, builder, catalogingProgress)
err = sync.Collect(sync.GetExecutor(ctx, "catalog"), sync.ToSeq(taskGroups[i]), nil, func(t task.Task) (any, error) {
return nil, task.RunTask(ctx, t, resolver, builder, catalogingProgress)
})
if err != nil {
// TODO: tie this to the open progress monitors...
return nil, fmt.Errorf("failed to run tasks: %w", err)
Expand All @@ -90,6 +90,49 @@ func CreateSBOM(ctx context.Context, src source.Source, cfg *CreateSBOMConfig) (
return &s, nil
}

func setupContext(ctx context.Context, cfg *CreateSBOMConfig) (context.Context, error) {
// configure parallel executors
ctx = setContextExecutors(ctx, cfg)

// configure license scanner
return setContextLicenseScanner(ctx, cfg)
}

func setContextLicenseScanner(ctx context.Context, cfg *CreateSBOMConfig) (context.Context, error) {
// inject a single license scanner and content config for all package cataloging tasks into context
licenseScanner, err := licenses.NewDefaultScanner(
licenses.WithIncludeLicenseContent(cfg.Licenses.IncludeUnkownLicenseContent),
licenses.WithCoverage(cfg.Licenses.Coverage),
)
if err != nil {
return nil, fmt.Errorf("could not build licenseScanner for cataloging: %w", err)
}
ctx = licenses.SetContextLicenseScanner(ctx, licenseScanner)
return ctx, nil
}

func setContextExecutors(ctx context.Context, cfg *CreateSBOMConfig) context.Context {
parallelism := 0
if cfg != nil {
parallelism = cfg.Parallelism
}
// executor parallelism is: 0 == serial, no goroutines, 1 == max 1 goroutine
// so if they set 1, we just run in serial to avoid overhead, and treat 0 as default, reasonable max for the system
// negative is unbounded, so no need for any other special handling
switch parallelism {
case 0:
parallelism = runtime.NumCPU() * 4
case 1:
parallelism = 0 // run in serial, don't spawn goroutines
}
// set up executors for each dimension we want to coordinate bounds for
ctx = sync.SetContextExecutor(ctx, "catalog", sync.NewExecutor(parallelism))
ctx = sync.SetContextExecutor(ctx, "cpu", sync.NewExecutor(parallelism))
ctx = sync.SetContextExecutor(ctx, "io", sync.NewExecutor(parallelism))
ctx = sync.SetContextExecutor(ctx, "net", sync.NewExecutor(parallelism))
return ctx
}

func monitorPackageCount(prog *monitor.CatalogerTaskProgress) func(s *sbom.SBOM) {
return func(s *sbom.SBOM) {
count := humanize.Comma(int64(s.Artifacts.Packages.PackageCount()))
Expand Down
6 changes: 1 addition & 5 deletions syft/create_sbom_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func DefaultCreateSBOMConfig() *CreateSBOMConfig {
Packages: pkgcataloging.DefaultConfig(),
Licenses: cataloging.DefaultLicenseConfig(),
Files: filecataloging.DefaultConfig(),
Parallelism: 1,
Parallelism: 0, // use default, based on number of CPUs
packageTaskFactories: task.DefaultPackageTaskFactories(),

// library consumers are free to override the tool values to fit their needs, however, we have some sane defaults
Expand Down Expand Up @@ -91,10 +91,6 @@ func (c *CreateSBOMConfig) WithTool(name, version string, cfg ...any) *CreateSBO

// WithParallelism allows for setting the number of concurrent cataloging tasks that can be performed at once
func (c *CreateSBOMConfig) WithParallelism(p int) *CreateSBOMConfig {
if p < 1 {
// TODO: warn?
p = 1
}
c.Parallelism = p
return c
}
Expand Down
25 changes: 15 additions & 10 deletions syft/file/cataloger/executable/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executable

import (
"bytes"
"context"
"debug/elf"
"debug/macho"
"encoding/binary"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/bmatcuk/doublestar/v4"
"github.com/dustin/go-humanize"

"github.com/anchore/go-sync"
"github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/bus"
"github.com/anchore/syft/internal/log"
Expand Down Expand Up @@ -45,9 +47,11 @@ func NewCataloger(cfg Config) *Cataloger {
}
}

func (i *Cataloger) Catalog(resolver file.Resolver) (map[file.Coordinates]file.Executable, error) {
var errs error
// func (i *Cataloger) Catalog(resolver file.Resolver) (map[file.Coordinates]file.Executable, error) {
// return i.CatalogCtx(context.Background(), resolver)
//}

func (i *Cataloger) CatalogCtx(ctx context.Context, resolver file.Resolver) (map[file.Coordinates]file.Executable, error) {
locs, err := resolver.FilesByMIMEType(i.config.MIMETypes...)
if err != nil {
return nil, fmt.Errorf("unable to get file locations for binaries: %w", err)
Expand All @@ -61,19 +65,20 @@ func (i *Cataloger) Catalog(resolver file.Resolver) (map[file.Coordinates]file.E
prog := catalogingProgress(int64(len(locs)))

results := make(map[file.Coordinates]file.Executable)
for _, loc := range locs {
errs := sync.Collect(sync.GetExecutor(ctx, "io"), sync.ToSeq(locs), func(loc file.Location, exec *file.Executable) {
if exec != nil {
prog.Increment()
results[loc.Coordinates] = *exec
}
}, func(loc file.Location) (*file.Executable, error) {
prog.AtomicStage.Set(loc.Path())

exec, err := processExecutableLocation(loc, resolver)
if err != nil {
errs = unknown.Append(errs, loc, err)
err = unknown.New(loc, err)
}

if exec != nil {
prog.Increment()
results[loc.Coordinates] = *exec
}
}
return exec, err
})

log.Debugf("executable cataloger processed %d files", len(results))

Expand Down
29 changes: 16 additions & 13 deletions syft/file/cataloger/filedigest/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/dustin/go-humanize"

"github.com/anchore/go-sync"
stereoscopeFile "github.com/anchore/stereoscope/pkg/file"
"github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/bus"
Expand All @@ -34,7 +35,6 @@ func NewCataloger(hashes []crypto.Hash) *Cataloger {
func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) {
results := make(map[file.Coordinates][]file.Digest)
var locations []file.Location
var errs error

if len(coordinates) == 0 {
locations = intCataloger.AllRegularFiles(ctx, resolver)
Expand All @@ -49,41 +49,44 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin
}

prog := catalogingProgress(int64(len(locations)))
for _, location := range locations {
result, err := i.catalogLocation(resolver, location)

err := sync.Collect(sync.GetExecutor(ctx, "io"), sync.ToSeq(locations), func(location file.Location, digests []file.Digest) {
if len(digests) > 0 {
results[location.Coordinates] = digests
}
}, func(location file.Location) ([]file.Digest, error) {
result, err := i.catalogLocation(ctx, resolver, location)

if errors.Is(err, ErrUndigestableFile) {
continue
return nil, nil
}

prog.AtomicStage.Set(location.Path())

if internal.IsErrPathPermission(err) {
log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err)
errs = unknown.Append(errs, location, err)
continue
return nil, unknown.New(location, err)
}

if err != nil {
prog.SetError(err)
errs = unknown.Append(errs, location, err)
continue
return nil, unknown.New(location, err)
}

prog.Increment()

results[location.Coordinates] = result
}
return result, nil
})

log.Debugf("file digests cataloger processed %d files", prog.Current())

prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current())))
prog.SetCompleted()

return results, errs
return results, err
}

func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) {
func (i *Cataloger) catalogLocation(ctx context.Context, resolver file.Resolver, location file.Location) ([]file.Digest, error) {
meta, err := resolver.FileMetadataByLocation(location)
if err != nil {
return nil, err
Expand All @@ -100,7 +103,7 @@ func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Locati
}
defer internal.CloseAndLogError(contentReader, location.AccessPath)

digests, err := intFile.NewDigestsFromFile(contentReader, i.hashes)
digests, err := intFile.NewDigestsFromFile(ctx, contentReader, i.hashes)
if err != nil {
return nil, internal.ErrPath{Context: "digests-cataloger", Path: location.RealPath, Err: err}
}
Expand Down
4 changes: 2 additions & 2 deletions syft/file/cataloger/filedigest/cataloger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func TestDigestsCataloger(t *testing.T) {
name: "md5",
digests: []crypto.Hash{crypto.MD5},
files: []string{"test-fixtures/last/empty/empty", "test-fixtures/last/path.txt"},
expected: testDigests(t, "test-fixtures/last", []string{"empty/empty", "path.txt"}, crypto.MD5),
expected: testDigests(t, "test-fixtures/last", []string{"path.txt"}, crypto.MD5),
},
{
name: "md5-sha1-sha256",
digests: []crypto.Hash{crypto.MD5, crypto.SHA1, crypto.SHA256},
files: []string{"test-fixtures/last/empty/empty", "test-fixtures/last/path.txt"},
expected: testDigests(t, "test-fixtures/last", []string{"empty/empty", "path.txt"}, crypto.MD5, crypto.SHA1, crypto.SHA256),
expected: testDigests(t, "test-fixtures/last", []string{"path.txt"}, crypto.MD5, crypto.SHA1, crypto.SHA256),
},
}

Expand Down
Loading
Loading