From e754c935413277d72bd8672cb8b41eb6772597b4 Mon Sep 17 00:00:00 2001 From: zyxkad Date: Tue, 20 Feb 2024 16:56:24 -0700 Subject: [PATCH] make file checking parallel again --- cluster.go | 54 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/cluster.go b/cluster.go index 631635dc..819674d1 100644 --- a/cluster.go +++ b/cluster.go @@ -610,7 +610,13 @@ type fileInfoWithTargets struct { targets []Storage } -func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, missing *SyncMap[string, *fileInfoWithTargets]) { +func (cr *Cluster) checkFileFor( + ctx context.Context, + storage Storage, files []FileInfo, + heavy bool, + missing *SyncMap[string, *fileInfoWithTargets], + pg *mpb.Progress, +) { addMissing := func(f FileInfo) { if info, has := missing.GetOrSet(f.Hash, func() *fileInfoWithTargets { return &fileInfoWithTargets{ @@ -626,10 +632,6 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m logInfof("Start checking files for %s, heavy = %v", storage.String(), heavy) - pg := mpb.New(mpb.WithAutoRefresh(), mpb.WithWidth(140)) - setLogOutput(pg) - defer setLogOutput(nil) - var ( checkingHashMux sync.Mutex checkingHash string @@ -639,7 +641,7 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m bar := pg.AddBar((int64)(len(files)), mpb.BarRemoveOnComplete(), mpb.PrependDecorators( - decor.Name("Checking "+storage.String()), + decor.Name("> Checking "+storage.String()), ), mpb.AppendDecorators( decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), @@ -652,11 +654,13 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m checkingHashMux.Unlock() } if lastCheckingHash != "" { - _, err = fmt.Fprintln(w, " ", lastCheckingHash) + _, err = fmt.Fprintln(w, "\t", lastCheckingHash) } return }), true), ) + defer bar.Wait() + defer bar.Abort(true) sizeMap := make(map[string]int64, len(files)) storage.WalkDir(func(hash string, size int64) error { @@ -666,6 +670,9 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m var buf [1024 * 32]byte for _, f := range files { + if ctx.Err() != nil { + return + } start := time.Now() hash := f.Hash if checkingHashMux.TryLock() { @@ -718,15 +725,36 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m checkingHash = "" checkingHashMux.Unlock() - pg.Wait() logInfof("File check finished for %s", storage.String()) return } func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) error { + pg := mpb.New(mpb.WithAutoRefresh(), mpb.WithWidth(140)) + setLogOutput(pg) + defer setLogOutput(nil) + missingMap := NewSyncMap[string, *fileInfoWithTargets]() + done := make(chan struct{}, 0) + for _, s := range cr.storages { - cr.checkFileFor(s, files, heavyCheck, missingMap) + go func(s Storage) { + defer func() { + select { + case done <- struct{}{}: + case <-ctx.Done(): + } + }() + cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg) + }(s) + } + for i := len(cr.storages); i > 0; i-- { + select { + case <-done: + case <-ctx.Done(): + logWarn("File sync interrupted") + return ctx.Err() + } } totalFiles := len(missingMap.m) @@ -748,6 +776,7 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b } var stats syncStats + stats.pg = pg stats.noOpen = config.NoOpen || syncCfg.Source == "center" stats.slots = NewBufSlots(syncCfg.Concurrency) stats.totalFiles = totalFiles @@ -755,11 +784,6 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b stats.totalSize += f.Size } - pg := mpb.New(mpb.WithAutoRefresh(), mpb.WithWidth(140)) - stats.pg = pg - setLogOutput(pg) - defer setLogOutput(nil) - var barUnit decor.SizeB1024 stats.lastInc.Store(time.Now().UnixNano()) stats.totalBar = pg.AddBar(stats.totalSize, @@ -784,8 +808,6 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b logInfof("Starting sync files, count: %d, total: %s", totalFiles, bytesToUnit((float64)(stats.totalSize))) start := time.Now() - done := make(chan struct{}, 0) - for _, f := range missing { logDebugf("File %s is for %v", f.Hash, f.targets) pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo)