Skip to content

Commit

Permalink
make heavy check parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Feb 21, 2024
1 parent f5e4918 commit f81cb58
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ storages:
# 节点附加数据
data:
# 最多同时发起的连接数
max-conn: 16
max-conn: 24
# 启动之前生成 1-200MB 的测速文件 (默认为动态生成)
pre-gen-measures: false
# 设置为 true 后将跟踪 302 请求 (即不会将最终用户重定向到网盘)
Expand Down
67 changes: 38 additions & 29 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,12 +659,19 @@ func (cr *Cluster) checkFileFor(
checkingHashMux sync.Mutex
checkingHash string
lastCheckingHash string
sema = NewSemaphore(storage.MaxOpen())
)

bar := pg.AddBar((int64)(len(files)),
mpb.BarRemoveOnComplete(),
mpb.PrependDecorators(
decor.Name("> Checking "+storage.String()),
decor.OnCondition(
decor.Any(func(decor.Statistics) string {
return fmt.Sprintf("(%d / %d) ", sema.Len(), sema.Cap())
}),
heavy,
),
),
mpb.AppendDecorators(
decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR),
Expand Down Expand Up @@ -704,43 +711,45 @@ func (cr *Cluster) checkFileFor(
}
if f.Size == 0 {
logDebugf("Skipped empty file %s", hash)
goto CONTINUE
}
if size, ok := sizeMap[hash]; ok {
} else if size, ok := sizeMap[hash]; ok {
if size != f.Size {
logWarnf("Found modified file: size of %q is %d, expect %d", hash, size, f.Size)
goto MISSING
}
if heavy {
addMissing(f)
} else if heavy {
hashMethod, err := getHashMethod(len(hash))
if err != nil {
logErrorf("Unknown hash method for %q", hash)
goto CONTINUE
}
hw := hashMethod.New()

r, err := storage.Open(hash)
if err != nil {
logErrorf("Could not open %q: %v", hash, err)
goto MISSING
}
_, err = io.CopyBuffer(hw, r, buf[:])
r.Close()
if err != nil {
logErrorf("Could not calculate hash for %q: %v", hash, err)
goto CONTINUE
}
if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash {
logWarnf("Found modified file: hash of %q is %s, expect %s", hash, hs, f.Hash)
goto MISSING
} else {
if !sema.AcquireWithContext(ctx) {
return
}
go func(f FileInfo) {
defer sema.Release()
r, err := storage.Open(hash)
if err != nil {
logErrorf("Could not open %q: %v", hash, err)
} else {
hw := hashMethod.New()
_, err = io.CopyBuffer(hw, r, buf[:])
r.Close()
if err != nil {
logErrorf("Could not calculate hash for %s: %v", hash, err)
} else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != hash {
logWarnf("Found modified file: hash of %s became %s", hash, hs)
} else {
return
}
}
addMissing(f)
bar.EwmaIncrement(time.Since(start))
}(f)
continue
}
}
goto CONTINUE
} else {
logDebugf("Could not found file %q", hash)
addMissing(f)
}
logDebugf("Could not found file %q", hash)
MISSING:
addMissing(f)
CONTINUE:
bar.EwmaIncrement(time.Since(start))
}

Expand Down
23 changes: 17 additions & 6 deletions cmd_webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func cmdUploadWebdav(args []string) {
logInfof("From: %s", local.String())

webdavs := make([]*WebDavStorage, len(webdavOpts))
maxProc := 0
for i, opt := range webdavOpts {
if opt.MaxConn > maxProc {
maxProc = opt.MaxConn
}
s := new(WebDavStorage)
s.SetOptions(opt)
if err := s.Init(ctx, nil); err != nil {
Expand All @@ -79,10 +83,11 @@ func cmdUploadWebdav(args []string) {
webdavs[i] = s
}

var barUnit decor.SizeB1024
maxProc := runtime.GOMAXPROCS(0) * 4
if maxProc < 1 {
maxProc = 1
maxProc = runtime.GOMAXPROCS(0) * 4
if maxProc < 1 {
maxProc = 1
}
}
slots := make(chan int, maxProc)
for i := 0; i < maxProc; i++ {
Expand All @@ -109,6 +114,7 @@ func cmdUploadWebdav(args []string) {
return nil
})

var barUnit decor.SizeB1024
var wg sync.WaitGroup
pg := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithAutoRefresh())
setLogOutput(pg)
Expand All @@ -124,12 +130,15 @@ func cmdUploadWebdav(args []string) {
decor.EwmaETA(decor.ET_STYLE_GO, 30),
),
)
webdavBar.SetTotal((int64)(len(localFiles)*len(webdavs)), false)
webdavBar.SetTotal((int64)(len(localFiles)*2*len(webdavs)), false)
for i, s := range webdavs {
start := time.Now()
fileSet := make(map[string]int64)
err := s.WalkDir(func(hash string, size int64) error {
fileSet[hash] = size
now := time.Now()
webdavBar.EwmaIncrement(now.Sub(start))
start = now
return nil
})
if err != nil {
Expand All @@ -145,10 +154,12 @@ func cmdUploadWebdav(args []string) {
})
totalSize += size
}
now := time.Now()
webdavBar.EwmaIncrement(now.Sub(start))
start = now
}
totalFiles += (int64)(len(files))
webdavFiles[i] = files
webdavBar.EwmaIncrement(time.Since(start))
}
webdavBar.SetTotal(-1, true)
webdavBar.Wait()
Expand All @@ -174,7 +185,7 @@ func cmdUploadWebdav(args []string) {
),
)

logDebugf("Max Proc: %d", maxProc)
logInfof("Max Proc: %d", maxProc)

for i, files := range webdavFiles {
s := webdavs[i]
Expand Down
7 changes: 0 additions & 7 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ storages:
data:
cache-path: cache
compressor: ""
- type: webdav
id: webdav-storage-1
weight: 0
data:
pre-gen-measures: false
follow-redirect: false
redirect-link-cache: 0s
webdav-users:
example-user:
endpoint: https://webdav.example.com/path/to/endpoint/
Expand Down
2 changes: 2 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Storage interface {
// Init will be called before start to use a storage
Init(context.Context, *Cluster) error

// The maximum number of reader can be open at a time. zero means unlimited
MaxOpen() int
Size(hash string) (int64, error)
Open(hash string) (io.ReadCloser, error)
Create(hash string, r io.ReadSeeker) error
Expand Down
4 changes: 4 additions & 0 deletions storage_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (s *LocalStorage) Init(context.Context, *Cluster) (err error) {
return
}

func (s *LocalStorage) MaxOpen() int {
return 1024
}

func (s *LocalStorage) hashToPath(hash string) string {
return filepath.Join(s.opt.CachePath, hash[0:2], hash)
}
Expand Down
4 changes: 4 additions & 0 deletions storage_mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (s *MountStorage) Init(ctx context.Context, _ *Cluster) (err error) {
return
}

func (s *MountStorage) MaxOpen() int {
return 64
}

func (s *MountStorage) hashToPath(hash string) string {
return filepath.Join(s.opt.CachePath(), hash[0:2], hash)
}
Expand Down
42 changes: 40 additions & 2 deletions storage_webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (o *WebDavStorageOption) MarshalYAML() (any, error) {

func (o *WebDavStorageOption) UnmarshalYAML(n *yaml.Node) (err error) {
// set default values
o.MaxConn = 16
o.MaxConn = 24
o.PreGenMeasures = false
o.FollowRedirect = false
o.RedirectLinkCache = 0
Expand Down Expand Up @@ -106,6 +106,7 @@ type WebDavStorage struct {

cache Cache
cli *gowebdav.Client
slots *Semaphore
}

var _ Storage = (*WebDavStorage)(nil)
Expand Down Expand Up @@ -168,6 +169,8 @@ func (s *WebDavStorage) Init(ctx context.Context, cluster *Cluster) (err error)
s.cli = gowebdav.NewClient(s.opt.GetEndPoint(), s.opt.GetUsername(), s.opt.GetPassword())
s.cli.SetHeader("User-Agent", ClusterUserAgentFull)

s.slots = NewSemaphore(s.opt.MaxConn)

if err := s.cli.Mkdir("measure", 0755); err != nil {
if !webdavIsHTTPError(err, http.StatusConflict) {
logWarnf("Could not create measure folder for %s: %v", s.String(), err)
Expand Down Expand Up @@ -195,6 +198,10 @@ func (s *WebDavStorage) putFile(path string, r io.ReadSeeker) error {
return err
}
logDebugf("Putting %q", target)

s.slots.Acquire()
defer s.slots.Release()

cr := &countReader{r, 0}
req, err := http.NewRequestWithContext(context.TODO(), http.MethodPut, target, cr)
if err != nil {
Expand All @@ -216,20 +223,37 @@ func (s *WebDavStorage) putFile(path string, r io.ReadSeeker) error {
}
}

func (s *WebDavStorage) MaxOpen() int {
if s.opt.MaxConn <= 0 {
return 0
}
if s.opt.MaxConn > 4 {
return s.opt.MaxConn - 4
}
return 1
}

func (s *WebDavStorage) hashToPath(hash string) string {
return path.Join("download", hash[0:2], hash)
}

func (s *WebDavStorage) Size(hash string) (int64, error) {
s.slots.Acquire()
stat, err := s.cli.Stat(s.hashToPath(hash))
s.slots.Release()
if err != nil {
return 0, err
}
return stat.Size(), nil
}

func (s *WebDavStorage) Open(hash string) (r io.ReadCloser, err error) {
r, err = s.cli.ReadStream(s.hashToPath(hash))
s.slots.Acquire()
if r, err = s.cli.ReadStream(s.hashToPath(hash)); err != nil {
s.slots.Release()
return
}
r = s.slots.ProxyReader(r)
return
}

Expand All @@ -238,10 +262,15 @@ func (s *WebDavStorage) Create(hash string, r io.ReadSeeker) error {
}

func (s *WebDavStorage) Remove(hash string) error {
s.slots.Acquire()
defer s.slots.Release()
return s.cli.Remove(s.hashToPath(hash))
}

func (s *WebDavStorage) WalkDir(walker func(hash string, size int64) error) error {
s.slots.Acquire()
defer s.slots.Release()

for _, dir := range hex256 {
files, err := s.cli.ReadDir(path.Join("download", dir))
if err != nil {
Expand Down Expand Up @@ -318,6 +347,10 @@ func (s *WebDavStorage) ServeDownload(rw http.ResponseWriter, req *http.Request,
cli = http.DefaultClient
}

if !s.slots.AcquireWithContext(req.Context()) {
return 0, req.Context().Err()
}
defer s.slots.Release()
resp, err := cli.Do(tgReq)
if err != nil {
return 0, err
Expand Down Expand Up @@ -384,6 +417,11 @@ func (s *WebDavStorage) ServeMeasure(rw http.ResponseWriter, req *http.Request,
copyHeader("If-None-Match", tgReq.Header, req.Header)
copyHeader("If-Match", tgReq.Header, req.Header)
copyHeader("If-Range", tgReq.Header, req.Header)

if !s.slots.AcquireWithContext(req.Context()) {
return req.Context().Err()
}
defer s.slots.Release()
resp, err := noRedirectCli.Do(tgReq)
if err != nil {
return err
Expand Down
Loading

0 comments on commit f81cb58

Please sign in to comment.