diff --git a/README.MD b/README.MD index 88b3f057..904c35d7 100644 --- a/README.MD +++ b/README.MD @@ -199,7 +199,7 @@ storages: # 节点附加数据 data: # 最多同时发起的连接数 - max-conn: 16 + max-conn: 24 # 启动之前生成 1-200MB 的测速文件 (默认为动态生成) pre-gen-measures: false # 设置为 true 后将跟踪 302 请求 (即不会将最终用户重定向到网盘) diff --git a/cluster.go b/cluster.go index ee98babb..884fe20a 100644 --- a/cluster.go +++ b/cluster.go @@ -659,12 +659,19 @@ func (cr *Cluster) checkFileFor( checkingHashMux sync.Mutex checkingHash string lastCheckingHash string + sema = NewSemaphore(storage.MaxOpen()) ) bar := pg.AddBar((int64)(len(files)), mpb.BarRemoveOnComplete(), mpb.PrependDecorators( decor.Name("> Checking "+storage.String()), + decor.OnCondition( + decor.Any(func(decor.Statistics) string { + return fmt.Sprintf("(%d / %d) ", sema.Len(), sema.Cap()) + }), + heavy, + ), ), mpb.AppendDecorators( decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), @@ -704,43 +711,45 @@ func (cr *Cluster) checkFileFor( } if f.Size == 0 { logDebugf("Skipped empty file %s", hash) - goto CONTINUE - } - if size, ok := sizeMap[hash]; ok { + } else if size, ok := sizeMap[hash]; ok { if size != f.Size { logWarnf("Found modified file: size of %q is %d, expect %d", hash, size, f.Size) - goto MISSING - } - if heavy { + addMissing(f) + } else if heavy { hashMethod, err := getHashMethod(len(hash)) if err != nil { logErrorf("Unknown hash method for %q", hash) - goto CONTINUE - } - hw := hashMethod.New() - - r, err := storage.Open(hash) - if err != nil { - logErrorf("Could not open %q: %v", hash, err) - goto MISSING - } - _, err = io.CopyBuffer(hw, r, buf[:]) - r.Close() - if err != nil { - logErrorf("Could not calculate hash for %q: %v", hash, err) - goto CONTINUE - } - if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash { - logWarnf("Found modified file: hash of %q is %s, expect %s", hash, hs, f.Hash) - goto MISSING + } else { + if !sema.AcquireWithContext(ctx) { + return + } + go func(f FileInfo) { + defer sema.Release() + r, err := storage.Open(hash) + if err != nil { + logErrorf("Could not open %q: %v", hash, err) + } else { + hw := hashMethod.New() + _, err = io.CopyBuffer(hw, r, buf[:]) + r.Close() + if err != nil { + logErrorf("Could not calculate hash for %s: %v", hash, err) + } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != hash { + logWarnf("Found modified file: hash of %s became %s", hash, hs) + } else { + return + } + } + addMissing(f) + bar.EwmaIncrement(time.Since(start)) + }(f) + continue } } - goto CONTINUE + } else { + logDebugf("Could not found file %q", hash) + addMissing(f) } - logDebugf("Could not found file %q", hash) - MISSING: - addMissing(f) - CONTINUE: bar.EwmaIncrement(time.Since(start)) } diff --git a/cmd_webdav.go b/cmd_webdav.go index 12b6a686..93bf1ba1 100644 --- a/cmd_webdav.go +++ b/cmd_webdav.go @@ -68,7 +68,11 @@ func cmdUploadWebdav(args []string) { logInfof("From: %s", local.String()) webdavs := make([]*WebDavStorage, len(webdavOpts)) + maxProc := 0 for i, opt := range webdavOpts { + if opt.MaxConn > maxProc { + maxProc = opt.MaxConn + } s := new(WebDavStorage) s.SetOptions(opt) if err := s.Init(ctx, nil); err != nil { @@ -79,10 +83,11 @@ func cmdUploadWebdav(args []string) { webdavs[i] = s } - var barUnit decor.SizeB1024 - maxProc := runtime.GOMAXPROCS(0) * 4 if maxProc < 1 { - maxProc = 1 + maxProc = runtime.GOMAXPROCS(0) * 4 + if maxProc < 1 { + maxProc = 1 + } } slots := make(chan int, maxProc) for i := 0; i < maxProc; i++ { @@ -109,6 +114,7 @@ func cmdUploadWebdav(args []string) { return nil }) + var barUnit decor.SizeB1024 var wg sync.WaitGroup pg := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithAutoRefresh()) setLogOutput(pg) @@ -124,12 +130,15 @@ func cmdUploadWebdav(args []string) { decor.EwmaETA(decor.ET_STYLE_GO, 30), ), ) - webdavBar.SetTotal((int64)(len(localFiles)*len(webdavs)), false) + webdavBar.SetTotal((int64)(len(localFiles)*2*len(webdavs)), false) for i, s := range webdavs { start := time.Now() fileSet := make(map[string]int64) err := s.WalkDir(func(hash string, size int64) error { fileSet[hash] = size + now := time.Now() + webdavBar.EwmaIncrement(now.Sub(start)) + start = now return nil }) if err != nil { @@ -145,10 +154,12 @@ func cmdUploadWebdav(args []string) { }) totalSize += size } + now := time.Now() + webdavBar.EwmaIncrement(now.Sub(start)) + start = now } totalFiles += (int64)(len(files)) webdavFiles[i] = files - webdavBar.EwmaIncrement(time.Since(start)) } webdavBar.SetTotal(-1, true) webdavBar.Wait() @@ -174,7 +185,7 @@ func cmdUploadWebdav(args []string) { ), ) - logDebugf("Max Proc: %d", maxProc) + logInfof("Max Proc: %d", maxProc) for i, files := range webdavFiles { s := webdavs[i] diff --git a/config.yaml b/config.yaml index ae523289..cde83f07 100644 --- a/config.yaml +++ b/config.yaml @@ -33,13 +33,6 @@ storages: data: cache-path: cache compressor: "" - - type: webdav - id: webdav-storage-1 - weight: 0 - data: - pre-gen-measures: false - follow-redirect: false - redirect-link-cache: 0s webdav-users: example-user: endpoint: https://webdav.example.com/path/to/endpoint/ diff --git a/storage.go b/storage.go index 77def4a6..fcfc07e2 100644 --- a/storage.go +++ b/storage.go @@ -41,6 +41,8 @@ type Storage interface { // Init will be called before start to use a storage Init(context.Context, *Cluster) error + // The maximum number of reader can be open at a time. zero means unlimited + MaxOpen() int Size(hash string) (int64, error) Open(hash string) (io.ReadCloser, error) Create(hash string, r io.ReadSeeker) error diff --git a/storage_local.go b/storage_local.go index 1fbc4dba..e3a2d883 100644 --- a/storage_local.go +++ b/storage_local.go @@ -80,6 +80,10 @@ func (s *LocalStorage) Init(context.Context, *Cluster) (err error) { return } +func (s *LocalStorage) MaxOpen() int { + return 1024 +} + func (s *LocalStorage) hashToPath(hash string) string { return filepath.Join(s.opt.CachePath, hash[0:2], hash) } diff --git a/storage_mount.go b/storage_mount.go index ffcc1a50..838eccbd 100644 --- a/storage_mount.go +++ b/storage_mount.go @@ -117,6 +117,10 @@ func (s *MountStorage) Init(ctx context.Context, _ *Cluster) (err error) { return } +func (s *MountStorage) MaxOpen() int { + return 64 +} + func (s *MountStorage) hashToPath(hash string) string { return filepath.Join(s.opt.CachePath(), hash[0:2], hash) } diff --git a/storage_webdav.go b/storage_webdav.go index 0bedf604..15461136 100644 --- a/storage_webdav.go +++ b/storage_webdav.go @@ -62,7 +62,7 @@ func (o *WebDavStorageOption) MarshalYAML() (any, error) { func (o *WebDavStorageOption) UnmarshalYAML(n *yaml.Node) (err error) { // set default values - o.MaxConn = 16 + o.MaxConn = 24 o.PreGenMeasures = false o.FollowRedirect = false o.RedirectLinkCache = 0 @@ -106,6 +106,7 @@ type WebDavStorage struct { cache Cache cli *gowebdav.Client + slots *Semaphore } var _ Storage = (*WebDavStorage)(nil) @@ -168,6 +169,8 @@ func (s *WebDavStorage) Init(ctx context.Context, cluster *Cluster) (err error) s.cli = gowebdav.NewClient(s.opt.GetEndPoint(), s.opt.GetUsername(), s.opt.GetPassword()) s.cli.SetHeader("User-Agent", ClusterUserAgentFull) + s.slots = NewSemaphore(s.opt.MaxConn) + if err := s.cli.Mkdir("measure", 0755); err != nil { if !webdavIsHTTPError(err, http.StatusConflict) { logWarnf("Could not create measure folder for %s: %v", s.String(), err) @@ -195,6 +198,10 @@ func (s *WebDavStorage) putFile(path string, r io.ReadSeeker) error { return err } logDebugf("Putting %q", target) + + s.slots.Acquire() + defer s.slots.Release() + cr := &countReader{r, 0} req, err := http.NewRequestWithContext(context.TODO(), http.MethodPut, target, cr) if err != nil { @@ -216,12 +223,24 @@ func (s *WebDavStorage) putFile(path string, r io.ReadSeeker) error { } } +func (s *WebDavStorage) MaxOpen() int { + if s.opt.MaxConn <= 0 { + return 0 + } + if s.opt.MaxConn > 4 { + return s.opt.MaxConn - 4 + } + return 1 +} + func (s *WebDavStorage) hashToPath(hash string) string { return path.Join("download", hash[0:2], hash) } func (s *WebDavStorage) Size(hash string) (int64, error) { + s.slots.Acquire() stat, err := s.cli.Stat(s.hashToPath(hash)) + s.slots.Release() if err != nil { return 0, err } @@ -229,7 +248,12 @@ func (s *WebDavStorage) Size(hash string) (int64, error) { } func (s *WebDavStorage) Open(hash string) (r io.ReadCloser, err error) { - r, err = s.cli.ReadStream(s.hashToPath(hash)) + s.slots.Acquire() + if r, err = s.cli.ReadStream(s.hashToPath(hash)); err != nil { + s.slots.Release() + return + } + r = s.slots.ProxyReader(r) return } @@ -238,10 +262,15 @@ func (s *WebDavStorage) Create(hash string, r io.ReadSeeker) error { } func (s *WebDavStorage) Remove(hash string) error { + s.slots.Acquire() + defer s.slots.Release() return s.cli.Remove(s.hashToPath(hash)) } func (s *WebDavStorage) WalkDir(walker func(hash string, size int64) error) error { + s.slots.Acquire() + defer s.slots.Release() + for _, dir := range hex256 { files, err := s.cli.ReadDir(path.Join("download", dir)) if err != nil { @@ -318,6 +347,10 @@ func (s *WebDavStorage) ServeDownload(rw http.ResponseWriter, req *http.Request, cli = http.DefaultClient } + if !s.slots.AcquireWithContext(req.Context()) { + return 0, req.Context().Err() + } + defer s.slots.Release() resp, err := cli.Do(tgReq) if err != nil { return 0, err @@ -384,6 +417,11 @@ func (s *WebDavStorage) ServeMeasure(rw http.ResponseWriter, req *http.Request, copyHeader("If-None-Match", tgReq.Header, req.Header) copyHeader("If-Match", tgReq.Header, req.Header) copyHeader("If-Range", tgReq.Header, req.Header) + + if !s.slots.AcquireWithContext(req.Context()) { + return req.Context().Err() + } + defer s.slots.Release() resp, err := noRedirectCli.Do(tgReq) if err != nil { return err diff --git a/util.go b/util.go index dd02cd28..5653ba28 100644 --- a/util.go +++ b/util.go @@ -37,6 +37,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "gopkg.in/yaml.v3" @@ -481,3 +482,81 @@ func (s *BufSlots) Alloc(ctx context.Context) (slotId int, buf []byte, free func return 0, nil, nil } } + +type Semaphore struct { + c chan struct{} +} + +// NewSemaphore create a semaphore +// zero or negative size means infinity space +func NewSemaphore(size int) *Semaphore { + if size <= 0 { + return nil + } + return &Semaphore{ + c: make(chan struct{}, size), + } +} + +func (s *Semaphore) Len() int { + if s == nil { + return 0 + } + return len(s.c) +} + +func (s *Semaphore) Cap() int { + if s == nil { + return 0 + } + return cap(s.c) +} + +func (s *Semaphore) Acquire() { + if s == nil { + return + } + s.c <- struct{}{} +} + +func (s *Semaphore) AcquireWithContext(ctx context.Context) bool { + if s == nil { + return true + } + select { + case s.c <- struct{}{}: + return true + case <-ctx.Done(): + return false + } +} + +func (s *Semaphore) Release() { + if s == nil { + return + } + <-s.c +} + +type spProxyReader struct { + io.Reader + released atomic.Bool + s *Semaphore +} + +func (r *spProxyReader) Close() error { + if !r.released.Swap(true) { + r.s.Release() + } + if c, ok := r.Reader.(io.Closer); ok { + return c.Close() + } + return nil +} + +func (s *Semaphore) ProxyReader(r io.Reader) io.ReadCloser { + return &spProxyReader{ + Reader: r, + s: s, + } +}