From a403ffcec224d6edf76ce4c2ca82743837693b6c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 2 Sep 2023 12:51:10 +0900 Subject: [PATCH 01/12] [attempt by ycombinator] concurrent dcopy (#122) * Use worker pool * Introduce Concurrency() functional option * Collect all errors * Simplify type of Concurrency option * Make channels buffered * Make input channel unbuffered * Logging for debugging * 10x channel capacities * 10x concurrency * 100x concurrency * 100x in channel * Make output channel 1000x * Reducing numWorkers multiplier by 1/10 * Removing numWorkers multiplier * Reducing in channel capacity by 1/10 * Reducing output channel capacity by 1/10 * Reducing output channel capacity by further 1/10 * Remove input channel capacity multiplier * Removing multiplication factor for output channel capacity * Remove debugging statement --------- Co-authored-by: Shaunak Kashyap --- copy.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++-------- go.mod | 1 + go.sum | 6 ++++ options.go | 5 ++++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/copy.go b/copy.go index 085db78..560e9eb 100644 --- a/copy.go +++ b/copy.go @@ -1,11 +1,13 @@ package copy import ( + "go.uber.org/multierr" "io" "io/fs" "io/ioutil" "os" "path/filepath" + "sync" "time" ) @@ -18,36 +20,57 @@ type timespec struct { // Copy copies src to dest, doesn't matter if src is a directory or a file. func Copy(src, dest string, opts ...Options) error { opt := assureOptions(src, dest, opts...) + + var numCopyWorkers uint = 1 + if opt.Concurrency > 1 { + numCopyWorkers = opt.Concurrency + } + + inCh := make(chan workerInput, numCopyWorkers) + outCh := make(chan workerOutput, numCopyWorkers) + errCh := make(chan error) + go startWorkers(numCopyWorkers, inCh, outCh) + go processResults(outCh, errCh) + if opt.FS != nil { info, err := fs.Stat(opt.FS, src) if err != nil { return onError(src, dest, err, opt) } - return switchboard(src, dest, info, opt) + return switchboard(src, dest, info, opt, inCh) } info, err := os.Lstat(src) if err != nil { return onError(src, dest, err, opt) } - return switchboard(src, dest, info, opt) + + err = switchboard(src, dest, info, opt, inCh) + if err != nil { + close(inCh) + close(outCh) + return err + } + close(inCh) + + return <-errCh } // switchboard switches proper copy functions regarding file type, etc... // If there would be anything else here, add a case to this switchboard. -func switchboard(src, dest string, info os.FileInfo, opt Options) (err error) { +func switchboard(src, dest string, info os.FileInfo, opt Options, inCh chan workerInput) (err error) { if info.Mode()&os.ModeDevice != 0 && !opt.Specials { return onError(src, dest, err, opt) } switch { case info.Mode()&os.ModeSymlink != 0: - err = onsymlink(src, dest, opt) + err = onsymlink(src, dest, opt, inCh) case info.IsDir(): - err = dcopy(src, dest, info, opt) + err = dcopy(src, dest, info, opt, inCh) case info.Mode()&os.ModeNamedPipe != 0: err = pcopy(dest, info) default: - err = fcopy(src, dest, info, opt) + inCh <- workerInput{src, dest, info, opt} } return onError(src, dest, err, opt) @@ -56,7 +79,7 @@ func switchboard(src, dest string, info os.FileInfo, opt Options) (err error) { // copyNextOrSkip decide if this src should be copied or not. // Because this "copy" could be called recursively, // "info" MUST be given here, NOT nil. -func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options) error { +func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options, inCh chan workerInput) error { if opt.Skip != nil { skip, err := opt.Skip(info, src, dest) if err != nil { @@ -66,7 +89,7 @@ func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options) error { return nil } } - return switchboard(src, dest, info, opt) + return switchboard(src, dest, info, opt, inCh) } // fcopy is for just a file, @@ -145,7 +168,7 @@ func fcopy(src, dest string, info os.FileInfo, opt Options) (err error) { // dcopy is for a directory, // with scanning contents inside the directory // and pass everything to "copy" recursively. -func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { +func dcopy(srcdir, destdir string, info os.FileInfo, opt Options, inCh chan workerInput) (err error) { if skip, err := onDirExists(opt, srcdir, destdir); err != nil { return err } else if skip { @@ -186,7 +209,7 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { for _, content := range contents { cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name()) - if err = copyNextOrSkip(cs, cd, content, opt); err != nil { + if err = copyNextOrSkip(cs, cd, content, opt, inCh); err != nil { // If any error, exit immediately return } @@ -224,7 +247,7 @@ func onDirExists(opt Options, srcdir, destdir string) (bool, error) { return false, nil } -func onsymlink(src, dest string, opt Options) error { +func onsymlink(src, dest string, opt Options, inCh chan workerInput) error { switch opt.OnSymlink(src) { case Shallow: if err := lcopy(src, dest); err != nil { @@ -243,7 +266,7 @@ func onsymlink(src, dest string, opt Options) error { if err != nil { return err } - return copyNextOrSkip(orig, dest, info, opt) + return copyNextOrSkip(orig, dest, info, opt, inCh) case Skip: fallthrough default: @@ -282,3 +305,37 @@ func onError(src, dest string, err error, opt Options) error { return opt.OnError(src, dest, err) } + +type workerInput struct { + src string + dest string + info os.FileInfo + opt Options +} + +type workerOutput error + +func startWorkers(numWorkers uint, inCh chan workerInput, outCh chan workerOutput) { + var wg sync.WaitGroup + for workerID := uint(0); workerID < numWorkers; workerID++ { + wg.Add(1) + go worker(&wg, inCh, outCh) + } + wg.Wait() + close(outCh) +} + +func worker(wg *sync.WaitGroup, inCh chan workerInput, outCh chan workerOutput) { + for i := range inCh { + outCh <- fcopy(i.src, i.dest, i.info, i.opt) + } + wg.Done() +} + +func processResults(out chan workerOutput, result chan error) { + var err error + for o := range out { + err = multierr.Append(err, o) + } + result <- err +} diff --git a/go.mod b/go.mod index 2263c9e..c8a1cab 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,6 @@ go 1.18 require ( github.com/otiai10/mint v1.5.1 + go.uber.org/multierr v1.11.0 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 ) diff --git a/go.sum b/go.sum index 7fc5834..7ad8053 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/options.go b/options.go index 1b4e508..35c8d2f 100644 --- a/options.go +++ b/options.go @@ -65,6 +65,11 @@ type Options struct { // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS + // If given, returns the number of workers to use to concurrently perform + // the copying operation. It the returned value is <= 1, a value of 1 is + // used and copying will proceed serially. + Concurrency uint + intent struct { src string dest string From 6a6d5f17244e3f8ced1b58330c38ebbf6a4ea99c Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Sat, 2 Sep 2023 19:21:53 +0900 Subject: [PATCH 02/12] Revert "[attempt by ycombinator] concurrent dcopy (#122)" This reverts commit a403ffcec224d6edf76ce4c2ca82743837693b6c. --- copy.go | 81 ++++++++---------------------------------------------- go.mod | 1 - go.sum | 6 ---- options.go | 5 ---- 4 files changed, 12 insertions(+), 81 deletions(-) diff --git a/copy.go b/copy.go index 560e9eb..085db78 100644 --- a/copy.go +++ b/copy.go @@ -1,13 +1,11 @@ package copy import ( - "go.uber.org/multierr" "io" "io/fs" "io/ioutil" "os" "path/filepath" - "sync" "time" ) @@ -20,57 +18,36 @@ type timespec struct { // Copy copies src to dest, doesn't matter if src is a directory or a file. func Copy(src, dest string, opts ...Options) error { opt := assureOptions(src, dest, opts...) - - var numCopyWorkers uint = 1 - if opt.Concurrency > 1 { - numCopyWorkers = opt.Concurrency - } - - inCh := make(chan workerInput, numCopyWorkers) - outCh := make(chan workerOutput, numCopyWorkers) - errCh := make(chan error) - go startWorkers(numCopyWorkers, inCh, outCh) - go processResults(outCh, errCh) - if opt.FS != nil { info, err := fs.Stat(opt.FS, src) if err != nil { return onError(src, dest, err, opt) } - return switchboard(src, dest, info, opt, inCh) + return switchboard(src, dest, info, opt) } info, err := os.Lstat(src) if err != nil { return onError(src, dest, err, opt) } - - err = switchboard(src, dest, info, opt, inCh) - if err != nil { - close(inCh) - close(outCh) - return err - } - close(inCh) - - return <-errCh + return switchboard(src, dest, info, opt) } // switchboard switches proper copy functions regarding file type, etc... // If there would be anything else here, add a case to this switchboard. -func switchboard(src, dest string, info os.FileInfo, opt Options, inCh chan workerInput) (err error) { +func switchboard(src, dest string, info os.FileInfo, opt Options) (err error) { if info.Mode()&os.ModeDevice != 0 && !opt.Specials { return onError(src, dest, err, opt) } switch { case info.Mode()&os.ModeSymlink != 0: - err = onsymlink(src, dest, opt, inCh) + err = onsymlink(src, dest, opt) case info.IsDir(): - err = dcopy(src, dest, info, opt, inCh) + err = dcopy(src, dest, info, opt) case info.Mode()&os.ModeNamedPipe != 0: err = pcopy(dest, info) default: - inCh <- workerInput{src, dest, info, opt} + err = fcopy(src, dest, info, opt) } return onError(src, dest, err, opt) @@ -79,7 +56,7 @@ func switchboard(src, dest string, info os.FileInfo, opt Options, inCh chan work // copyNextOrSkip decide if this src should be copied or not. // Because this "copy" could be called recursively, // "info" MUST be given here, NOT nil. -func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options, inCh chan workerInput) error { +func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options) error { if opt.Skip != nil { skip, err := opt.Skip(info, src, dest) if err != nil { @@ -89,7 +66,7 @@ func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options, inCh chan w return nil } } - return switchboard(src, dest, info, opt, inCh) + return switchboard(src, dest, info, opt) } // fcopy is for just a file, @@ -168,7 +145,7 @@ func fcopy(src, dest string, info os.FileInfo, opt Options) (err error) { // dcopy is for a directory, // with scanning contents inside the directory // and pass everything to "copy" recursively. -func dcopy(srcdir, destdir string, info os.FileInfo, opt Options, inCh chan workerInput) (err error) { +func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { if skip, err := onDirExists(opt, srcdir, destdir); err != nil { return err } else if skip { @@ -209,7 +186,7 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options, inCh chan work for _, content := range contents { cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name()) - if err = copyNextOrSkip(cs, cd, content, opt, inCh); err != nil { + if err = copyNextOrSkip(cs, cd, content, opt); err != nil { // If any error, exit immediately return } @@ -247,7 +224,7 @@ func onDirExists(opt Options, srcdir, destdir string) (bool, error) { return false, nil } -func onsymlink(src, dest string, opt Options, inCh chan workerInput) error { +func onsymlink(src, dest string, opt Options) error { switch opt.OnSymlink(src) { case Shallow: if err := lcopy(src, dest); err != nil { @@ -266,7 +243,7 @@ func onsymlink(src, dest string, opt Options, inCh chan workerInput) error { if err != nil { return err } - return copyNextOrSkip(orig, dest, info, opt, inCh) + return copyNextOrSkip(orig, dest, info, opt) case Skip: fallthrough default: @@ -305,37 +282,3 @@ func onError(src, dest string, err error, opt Options) error { return opt.OnError(src, dest, err) } - -type workerInput struct { - src string - dest string - info os.FileInfo - opt Options -} - -type workerOutput error - -func startWorkers(numWorkers uint, inCh chan workerInput, outCh chan workerOutput) { - var wg sync.WaitGroup - for workerID := uint(0); workerID < numWorkers; workerID++ { - wg.Add(1) - go worker(&wg, inCh, outCh) - } - wg.Wait() - close(outCh) -} - -func worker(wg *sync.WaitGroup, inCh chan workerInput, outCh chan workerOutput) { - for i := range inCh { - outCh <- fcopy(i.src, i.dest, i.info, i.opt) - } - wg.Done() -} - -func processResults(out chan workerOutput, result chan error) { - var err error - for o := range out { - err = multierr.Append(err, o) - } - result <- err -} diff --git a/go.mod b/go.mod index c8a1cab..2263c9e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,5 @@ go 1.18 require ( github.com/otiai10/mint v1.5.1 - go.uber.org/multierr v1.11.0 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 ) diff --git a/go.sum b/go.sum index 7ad8053..7fc5834 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,4 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/options.go b/options.go index 35c8d2f..1b4e508 100644 --- a/options.go +++ b/options.go @@ -65,11 +65,6 @@ type Options struct { // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS - // If given, returns the number of workers to use to concurrently perform - // the copying operation. It the returned value is <= 1, a value of 1 is - // used and copying will proceed serially. - Concurrency uint - intent struct { src string dest string From 78d861cac4fc74700eee5b17eee6b799425f329e Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Sat, 2 Sep 2023 19:50:51 +0900 Subject: [PATCH 03/12] Add options for concurrent dcopy --- copy.go | 7 +++++++ go.mod | 1 + go.sum | 2 ++ options.go | 44 ++++++++++++++++++++++++++++++++++++-------- 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/copy.go b/copy.go index 085db78..3291634 100644 --- a/copy.go +++ b/copy.go @@ -1,12 +1,15 @@ package copy import ( + "context" "io" "io/fs" "io/ioutil" "os" "path/filepath" "time" + + "golang.org/x/sync/semaphore" ) type timespec struct { @@ -18,6 +21,10 @@ type timespec struct { // Copy copies src to dest, doesn't matter if src is a directory or a file. func Copy(src, dest string, opts ...Options) error { opt := assureOptions(src, dest, opts...) + if opt.NumberOfWorkers > 1 { + opt.intent.sem = semaphore.NewWeighted(opt.NumberOfWorkers) + opt.intent.ctx = context.Background() + } if opt.FS != nil { info, err := fs.Stat(opt.FS, src) if err != nil { diff --git a/go.mod b/go.mod index 2263c9e..d0f762b 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,6 @@ go 1.18 require ( github.com/otiai10/mint v1.5.1 + golang.org/x/sync v0.3.0 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 ) diff --git a/go.sum b/go.sum index 7fc5834..23993a8 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/options.go b/options.go index 1b4e508..978484a 100644 --- a/options.go +++ b/options.go @@ -1,9 +1,12 @@ package copy import ( + "context" "io" "io/fs" "os" + + "golang.org/x/sync/semaphore" ) // Options specifies optional actions on copying. @@ -65,10 +68,28 @@ type Options struct { // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS - intent struct { - src string - dest string - } + // NumberOfWorkers represents the number of workers used for + // concurrent copying contents of directories. + // If 0 or 1, it does not use goroutine for copying directories. + // Please refer to https://pkg.go.dev/golang.org/x/sync/semaphore for more details. + NumberOfWorkers int64 + + // PreferConcurrent is a function to determine whether or not + // to use goroutine for copying contents of directories. + // If PreferConcurrent is nil, which is default, it does concurrent + // copying for all directories. + // If NumberOfWorkers is 0 or 1, this function will be ignored. + PreferConcurrent func(srcdir, destdir string) (bool, error) + + // Internal use only + intent intent +} + +type intent struct { + src string + dest string + sem *semaphore.Weighted + ctx context.Context } // SymlinkAction represents what to do on symlink. @@ -112,10 +133,7 @@ func getDefaultOptions(src, dest string) Options { PreserveTimes: false, // Do not preserve the modification time CopyBufferSize: 0, // Do not specify, use default bufsize (32*1024) WrapReader: nil, // Do not wrap src files, use them as they are. - intent: struct { - src string - dest string - }{src, dest}, + intent: intent{src, dest, nil, nil}, } } @@ -141,3 +159,13 @@ func assureOptions(src, dest string, opts ...Options) Options { opts[0].intent.dest = defopt.intent.dest return opts[0] } + +func shouldCopyDirectoryConcurrent(opt Options, srcdir, destdir string) (bool, error) { + if opt.NumberOfWorkers <= 1 { + return false, nil + } + if opt.PreferConcurrent == nil { + return true, nil + } + return opt.PreferConcurrent(srcdir, destdir) +} From 4c9bb1f3e01051506cd87e65be991fd2cb0797b7 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Sun, 3 Sep 2023 17:45:10 +0900 Subject: [PATCH 04/12] Add tests for concurrent dcopy --- all_test.go | 14 ++++++++++++++ test/data/case19/README.md | 1 + test/data/case19/bar_sequential/aaa.txt | 0 test/data/case19/bar_sequential/bbb.txt | 0 test/data/case19/foo_concurrent/ccc.txt | 0 test/data/case19/foo_concurrent/ddd.txt | 0 6 files changed, 15 insertions(+) create mode 100644 test/data/case19/README.md create mode 100644 test/data/case19/bar_sequential/aaa.txt create mode 100644 test/data/case19/bar_sequential/bbb.txt create mode 100644 test/data/case19/foo_concurrent/ccc.txt create mode 100644 test/data/case19/foo_concurrent/ddd.txt diff --git a/all_test.go b/all_test.go index fbd12e5..6256b0b 100644 --- a/all_test.go +++ b/all_test.go @@ -475,3 +475,17 @@ func (r *SleepyReader) Read(p []byte) (int, error) { } return n, e } + +func TestOptions_NumberOfWorkers(t *testing.T) { + opt := Options{NumberOfWorkers: 3} + err := Copy("test/data/case19", "test/data.copy/case19", opt) + Expect(t, err).ToBe(nil) +} + +func TestOptions_PreferConcurrent(t *testing.T) { + opt := Options{NumberOfWorkers: 4, PreferConcurrent: func(sd, dd string) (bool, error) { + return strings.HasSuffix(sd, "concurrent"), nil + }} + err := Copy("test/data/case19", "test/data.copy/case19_preferconcurrent", opt) + Expect(t, err).ToBe(nil) +} diff --git a/test/data/case19/README.md b/test/data/case19/README.md new file mode 100644 index 0000000..0c54877 --- /dev/null +++ b/test/data/case19/README.md @@ -0,0 +1 @@ +# Concurrent case \ No newline at end of file diff --git a/test/data/case19/bar_sequential/aaa.txt b/test/data/case19/bar_sequential/aaa.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/data/case19/bar_sequential/bbb.txt b/test/data/case19/bar_sequential/bbb.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/data/case19/foo_concurrent/ccc.txt b/test/data/case19/foo_concurrent/ccc.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/data/case19/foo_concurrent/ddd.txt b/test/data/case19/foo_concurrent/ddd.txt new file mode 100644 index 0000000..e69de29 From e88ee39b725bc54b9c00214b020b9db87375c8c4 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Sun, 3 Sep 2023 17:45:32 +0900 Subject: [PATCH 05/12] Implementation for concurrent dcopy --- copy.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/copy.go b/copy.go index 3291634..6b1510b 100644 --- a/copy.go +++ b/copy.go @@ -9,6 +9,7 @@ import ( "path/filepath" "time" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) @@ -190,12 +191,15 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { return } - for _, content := range contents { - cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name()) - - if err = copyNextOrSkip(cs, cd, content, opt); err != nil { - // If any error, exit immediately - return + if yes, err := shouldCopyDirectoryConcurrent(opt, srcdir, destdir); err != nil { + return err + } else if yes { + if err := dcopyConcurrent(srcdir, destdir, contents, opt); err != nil { + return err + } + } else { + if err := dcopySequential(srcdir, destdir, contents, opt); err != nil { + return err } } @@ -214,6 +218,52 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { return } +func dcopySequential(srcdir, destdir string, contents []os.FileInfo, opt Options) error { + for _, content := range contents { + cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name()) + + if err := copyNextOrSkip(cs, cd, content, opt); err != nil { + // If any error, exit immediately + return err + } + } + return nil +} + +// Copy this directory concurrently regarding semaphore of opt.intent +func dcopyConcurrent(srcdir, destdir string, contents []os.FileInfo, opt Options) error { + group, ctx := errgroup.WithContext(opt.intent.ctx) + cancelctx, cancel := context.WithCancel(ctx) + getDcopyRoutine := func(cs, cd string, content os.FileInfo) func() error { + return func() error { + select { + case <-cancelctx.Done(): + return nil + case <-opt.intent.ctx.Done(): + return nil + default: + if err := opt.intent.sem.Acquire(cancelctx, 1); err != nil { + cancel() + return err + } + err := copyNextOrSkip(cs, cd, content, opt) + opt.intent.sem.Release(1) + if err != nil { + cancel() + return err + } + return nil + } + } + } + for _, content := range contents { + csd := filepath.Join(srcdir, content.Name()) + cdd := filepath.Join(destdir, content.Name()) + group.Go(getDcopyRoutine(csd, cdd, content)) + } + return group.Wait() +} + func onDirExists(opt Options, srcdir, destdir string) (bool, error) { _, err := os.Stat(destdir) if err == nil && opt.OnDirExists != nil && destdir != opt.intent.dest { From 87cb224c8e3e7f644a8d9ae5664b44967092ddd3 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Sun, 3 Sep 2023 18:01:23 +0900 Subject: [PATCH 06/12] Update README --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1cc8fc8..fd8da9e 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,19 @@ type Options struct { // If given, copy.Copy refers to this fs.FS instead of the OS filesystem. // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS + + // NumberOfWorkers represents the number of workers used for + // concurrent copying contents of directories. + // If 0 or 1, it does not use goroutine for copying directories. + // Please refer to https://pkg.go.dev/golang.org/x/sync/semaphore for more details. + NumberOfWorkers int64 + + // PreferConcurrent is a function to determine whether or not + // to use goroutine for copying contents of directories. + // If PreferConcurrent is nil, which is default, it does concurrent + // copying for all directories. + // If NumberOfWorkers is 0 or 1, this function will be ignored. + PreferConcurrent func(srcdir, destdir string) (bool, error) } ``` @@ -105,4 +118,4 @@ err := Copy("your/directory", "your/directory.copy", opt) ## License -[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fotiai10%2Fcopy.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fotiai10%2Fcopy?ref=badge_large) \ No newline at end of file +[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fotiai10%2Fcopy.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fotiai10%2Fcopy?ref=badge_large) From c165d68ab3152330c5eb65e1f061bf6e44447fb4 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Thu, 7 Sep 2023 15:13:11 +0900 Subject: [PATCH 07/12] Add more test files for case19 to fail Since it acquires semaphore worker for directory and locks until it ends, there might be "never-ending" semaphore. --- test/data/case19/foo_concurrent/baz_concurrent/eee.txt | 0 .../case19/foo_concurrent/baz_concurrent/hoge_concurrent/fff.txt | 0 .../baz_concurrent/hoge_concurrent/fuga_concurrent/ggg.txt | 0 3 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 test/data/case19/foo_concurrent/baz_concurrent/eee.txt create mode 100644 test/data/case19/foo_concurrent/baz_concurrent/hoge_concurrent/fff.txt create mode 100644 test/data/case19/foo_concurrent/baz_concurrent/hoge_concurrent/fuga_concurrent/ggg.txt diff --git a/test/data/case19/foo_concurrent/baz_concurrent/eee.txt b/test/data/case19/foo_concurrent/baz_concurrent/eee.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/data/case19/foo_concurrent/baz_concurrent/hoge_concurrent/fff.txt b/test/data/case19/foo_concurrent/baz_concurrent/hoge_concurrent/fff.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/data/case19/foo_concurrent/baz_concurrent/hoge_concurrent/fuga_concurrent/ggg.txt b/test/data/case19/foo_concurrent/baz_concurrent/hoge_concurrent/fuga_concurrent/ggg.txt new file mode 100644 index 0000000..e69de29 From 79342f543614b767bc479867a75d2527d5c560a9 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Thu, 7 Sep 2023 15:17:05 +0900 Subject: [PATCH 08/12] Do not acquire semaphore worker for directory --- copy.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/copy.go b/copy.go index 6b1510b..5fd1607 100644 --- a/copy.go +++ b/copy.go @@ -242,6 +242,9 @@ func dcopyConcurrent(srcdir, destdir string, contents []os.FileInfo, opt Options case <-opt.intent.ctx.Done(): return nil default: + if content.IsDir() { + return copyNextOrSkip(cs, cd, content, opt) + } if err := opt.intent.sem.Acquire(cancelctx, 1); err != nil { cancel() return err From a5478eaada3cb9f2a096844031d2eed0b6238b0f Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Thu, 7 Sep 2023 15:26:51 +0900 Subject: [PATCH 09/12] Remove unnecessary `cancel` context errorgroup.WithContext can cancel sibling routines when a routine returns non-nil error. See https://cs.opensource.google/go/x/sync/+/refs/tags/v0.3.0:errgroup/errgroup.go;l=40-48 --- copy.go | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/copy.go b/copy.go index 5fd1607..dab26df 100644 --- a/copy.go +++ b/copy.go @@ -233,36 +233,26 @@ func dcopySequential(srcdir, destdir string, contents []os.FileInfo, opt Options // Copy this directory concurrently regarding semaphore of opt.intent func dcopyConcurrent(srcdir, destdir string, contents []os.FileInfo, opt Options) error { group, ctx := errgroup.WithContext(opt.intent.ctx) - cancelctx, cancel := context.WithCancel(ctx) - getDcopyRoutine := func(cs, cd string, content os.FileInfo) func() error { + getRoutine := func(cs, cd string, content os.FileInfo) func() error { return func() error { - select { - case <-cancelctx.Done(): - return nil - case <-opt.intent.ctx.Done(): - return nil - default: - if content.IsDir() { - return copyNextOrSkip(cs, cd, content, opt) - } - if err := opt.intent.sem.Acquire(cancelctx, 1); err != nil { - cancel() - return err - } - err := copyNextOrSkip(cs, cd, content, opt) - opt.intent.sem.Release(1) - if err != nil { - cancel() - return err - } - return nil + if content.IsDir() { + return copyNextOrSkip(cs, cd, content, opt) } + if err := opt.intent.sem.Acquire(ctx, 1); err != nil { + return err + } + err := copyNextOrSkip(cs, cd, content, opt) + opt.intent.sem.Release(1) + if err != nil { + return err + } + return nil } } for _, content := range contents { csd := filepath.Join(srcdir, content.Name()) cdd := filepath.Join(destdir, content.Name()) - group.Go(getDcopyRoutine(csd, cdd, content)) + group.Go(getRoutine(csd, cdd, content)) } return group.Wait() } From 3eebd945bcdfb64ef50e68fe719600747819f8a4 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Thu, 7 Sep 2023 15:30:35 +0900 Subject: [PATCH 10/12] Refactor: Remove useless if-condition --- copy.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/copy.go b/copy.go index dab26df..6662f84 100644 --- a/copy.go +++ b/copy.go @@ -243,10 +243,7 @@ func dcopyConcurrent(srcdir, destdir string, contents []os.FileInfo, opt Options } err := copyNextOrSkip(cs, cd, content, opt) opt.intent.sem.Release(1) - if err != nil { - return err - } - return nil + return err } } for _, content := range contents { From 0b8d7bdb1f15bcebfb88483b9c7e5ea7e8639082 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Thu, 7 Sep 2023 15:50:10 +0900 Subject: [PATCH 11/12] Add benchmark test --- .github/workflows/go.yml | 19 ++++++++++++++++++- benchmark_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 benchmark_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 8413309..a6b05d2 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -16,7 +16,6 @@ jobs: os: [ubuntu-latest, macos-latest, windows-latest] go: ['1.18', '1.19', '1.20'] steps: - - name: Set up Go uses: actions/setup-go@v4 with: @@ -34,3 +33,21 @@ jobs: - name: Test run: go test -v --tags=go${{ matrix.go }} + benchmark: + name: Benchmark + runs-on: ubuntu-latest + strategy: + matrix: + go: ['1.21'] + steps: + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go }} + id: go + - name: Check out code into the Go module directory + uses: actions/checkout@v3 + - name: Get dependencies + run: go get -v -t -d ./... + - name: Benchmark + run: go test -bench . -benchmem -benchtime 8s \ No newline at end of file diff --git a/benchmark_test.go b/benchmark_test.go new file mode 100644 index 0000000..0817440 --- /dev/null +++ b/benchmark_test.go @@ -0,0 +1,38 @@ +package copy + +import ( + "fmt" + "testing" +) + +func BenchmarkOptions_NumberOfWorkers_0(b *testing.B) { + var num int64 = 0 // 0 or 1 = single-threaded + opt := Options{NumberOfWorkers: num} + for i := 0; i < b.N; i++ { + Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) + } +} + +func BenchmarkOptions_NumberOfWorkers_2(b *testing.B) { + var num int64 = 2 + opt := Options{NumberOfWorkers: num} + for i := 0; i < b.N; i++ { + Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) + } +} + +func BenchmarkOptions_NumberOfWorkers_4(b *testing.B) { + var num int64 = 4 + opt := Options{NumberOfWorkers: num} + for i := 0; i < b.N; i++ { + Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) + } +} + +func BenchmarkOptions_NumberOfWorkers_8(b *testing.B) { + var num int64 = 8 + opt := Options{NumberOfWorkers: num} + for i := 0; i < b.N; i++ { + Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) + } +} From 4118a68e25216eefa8825f742e0f6b287d8c3ff1 Mon Sep 17 00:00:00 2001 From: Hiromu OCHIAI Date: Thu, 7 Sep 2023 15:53:44 +0900 Subject: [PATCH 12/12] Change interface name: NumOfWorkers --- README.md | 6 +++--- all_test.go | 6 +++--- benchmark_test.go | 16 ++++++++-------- copy.go | 4 ++-- options.go | 8 ++++---- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index fd8da9e..7c8b4e0 100644 --- a/README.md +++ b/README.md @@ -87,17 +87,17 @@ type Options struct { // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS - // NumberOfWorkers represents the number of workers used for + // NumOfWorkers represents the number of workers used for // concurrent copying contents of directories. // If 0 or 1, it does not use goroutine for copying directories. // Please refer to https://pkg.go.dev/golang.org/x/sync/semaphore for more details. - NumberOfWorkers int64 + NumOfWorkers int64 // PreferConcurrent is a function to determine whether or not // to use goroutine for copying contents of directories. // If PreferConcurrent is nil, which is default, it does concurrent // copying for all directories. - // If NumberOfWorkers is 0 or 1, this function will be ignored. + // If NumOfWorkers is 0 or 1, this function will be ignored. PreferConcurrent func(srcdir, destdir string) (bool, error) } ``` diff --git a/all_test.go b/all_test.go index 6256b0b..13bae07 100644 --- a/all_test.go +++ b/all_test.go @@ -476,14 +476,14 @@ func (r *SleepyReader) Read(p []byte) (int, error) { return n, e } -func TestOptions_NumberOfWorkers(t *testing.T) { - opt := Options{NumberOfWorkers: 3} +func TestOptions_NumOfWorkers(t *testing.T) { + opt := Options{NumOfWorkers: 3} err := Copy("test/data/case19", "test/data.copy/case19", opt) Expect(t, err).ToBe(nil) } func TestOptions_PreferConcurrent(t *testing.T) { - opt := Options{NumberOfWorkers: 4, PreferConcurrent: func(sd, dd string) (bool, error) { + opt := Options{NumOfWorkers: 4, PreferConcurrent: func(sd, dd string) (bool, error) { return strings.HasSuffix(sd, "concurrent"), nil }} err := Copy("test/data/case19", "test/data.copy/case19_preferconcurrent", opt) diff --git a/benchmark_test.go b/benchmark_test.go index 0817440..3a3e153 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -5,33 +5,33 @@ import ( "testing" ) -func BenchmarkOptions_NumberOfWorkers_0(b *testing.B) { +func BenchmarkOptions_NumOfWorkers_0(b *testing.B) { var num int64 = 0 // 0 or 1 = single-threaded - opt := Options{NumberOfWorkers: num} + opt := Options{NumOfWorkers: num} for i := 0; i < b.N; i++ { Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) } } -func BenchmarkOptions_NumberOfWorkers_2(b *testing.B) { +func BenchmarkOptions_NumOfWorkers_2(b *testing.B) { var num int64 = 2 - opt := Options{NumberOfWorkers: num} + opt := Options{NumOfWorkers: num} for i := 0; i < b.N; i++ { Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) } } -func BenchmarkOptions_NumberOfWorkers_4(b *testing.B) { +func BenchmarkOptions_NumOfWorkers_4(b *testing.B) { var num int64 = 4 - opt := Options{NumberOfWorkers: num} + opt := Options{NumOfWorkers: num} for i := 0; i < b.N; i++ { Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) } } -func BenchmarkOptions_NumberOfWorkers_8(b *testing.B) { +func BenchmarkOptions_NumOfWorkers_8(b *testing.B) { var num int64 = 8 - opt := Options{NumberOfWorkers: num} + opt := Options{NumOfWorkers: num} for i := 0; i < b.N; i++ { Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt) } diff --git a/copy.go b/copy.go index 6662f84..2979af9 100644 --- a/copy.go +++ b/copy.go @@ -22,8 +22,8 @@ type timespec struct { // Copy copies src to dest, doesn't matter if src is a directory or a file. func Copy(src, dest string, opts ...Options) error { opt := assureOptions(src, dest, opts...) - if opt.NumberOfWorkers > 1 { - opt.intent.sem = semaphore.NewWeighted(opt.NumberOfWorkers) + if opt.NumOfWorkers > 1 { + opt.intent.sem = semaphore.NewWeighted(opt.NumOfWorkers) opt.intent.ctx = context.Background() } if opt.FS != nil { diff --git a/options.go b/options.go index 978484a..1fbfcb1 100644 --- a/options.go +++ b/options.go @@ -68,17 +68,17 @@ type Options struct { // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS - // NumberOfWorkers represents the number of workers used for + // NumOfWorkers represents the number of workers used for // concurrent copying contents of directories. // If 0 or 1, it does not use goroutine for copying directories. // Please refer to https://pkg.go.dev/golang.org/x/sync/semaphore for more details. - NumberOfWorkers int64 + NumOfWorkers int64 // PreferConcurrent is a function to determine whether or not // to use goroutine for copying contents of directories. // If PreferConcurrent is nil, which is default, it does concurrent // copying for all directories. - // If NumberOfWorkers is 0 or 1, this function will be ignored. + // If NumOfWorkers is 0 or 1, this function will be ignored. PreferConcurrent func(srcdir, destdir string) (bool, error) // Internal use only @@ -161,7 +161,7 @@ func assureOptions(src, dest string, opts ...Options) Options { } func shouldCopyDirectoryConcurrent(opt Options, srcdir, destdir string) (bool, error) { - if opt.NumberOfWorkers <= 1 { + if opt.NumOfWorkers <= 1 { return false, nil } if opt.PreferConcurrent == nil {