diff --git a/cluster.go b/cluster.go index e8430d33..631635dc 100644 --- a/cluster.go +++ b/cluster.go @@ -35,6 +35,7 @@ import ( "os" "path" "path/filepath" + "sort" "strings" "sync" "sync/atomic" @@ -588,6 +589,7 @@ func (cr *Cluster) SyncFiles(ctx context.Context, files []FileInfo, heavyCheck b return } + sort.Slice(files, func(i, j int) bool { return files[i].Hash < files[j].Hash }) cr.syncFiles(ctx, files, heavyCheck) fileset := make(map[string]int64, len(files)) @@ -629,24 +631,31 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m defer setLogOutput(nil) var ( - checkingHashMux sync.RWMutex - checkingHash string + checkingHashMux sync.Mutex + checkingHash string + lastCheckingHash string ) bar := pg.AddBar((int64)(len(files)), mpb.BarRemoveOnComplete(), mpb.PrependDecorators( decor.Name("Checking "+storage.String()), - decor.Any(func(decor.Statistics) string { - checkingHashMux.RLock() - defer checkingHashMux.RUnlock() - return "/" + checkingHash - }), ), mpb.AppendDecorators( decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), decor.NewPercentage("%d"), + decor.EwmaETA(decor.ET_STYLE_GO, 30), ), + mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) { + if checkingHashMux.TryLock() { + lastCheckingHash = checkingHash + checkingHashMux.Unlock() + } + if lastCheckingHash != "" { + _, err = fmt.Fprintln(w, " ", lastCheckingHash) + } + return + }), true), ) sizeMap := make(map[string]int64, len(files)) @@ -657,25 +666,26 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m var buf [1024 * 32]byte for _, f := range files { + start := time.Now() hash := f.Hash - checkingHashMux.Lock() - checkingHash = hash - checkingHashMux.Unlock() - bar.Increment() + if checkingHashMux.TryLock() { + checkingHash = hash + checkingHashMux.Unlock() + } if f.Size == 0 { logDebugf("Skipped empty file %s", hash) - continue + goto CONTINUE } if size, ok := sizeMap[hash]; ok { if size != f.Size { - logInfof("Found modified file: size of %q is %d, expect %d", hash, size, f.Size) + logWarnf("Found modified file: size of %q is %d, expect %d", hash, size, f.Size) goto MISSING } if heavy { hashMethod, err := getHashMethod(len(hash)) if err != nil { logErrorf("Unknown hash method for %q", hash) - continue + goto CONTINUE } hw := hashMethod.New() @@ -688,19 +698,26 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m r.Close() if err != nil { logErrorf("Could not calculate hash for %q: %v", hash, err) - continue + goto CONTINUE } if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash { - logInfof("Found modified file: hash of %q is %s, expect %s", hash, hs, f.Hash) + logWarnf("Found modified file: hash of %q is %s, expect %s", hash, hs, f.Hash) goto MISSING } } - continue + goto CONTINUE } logDebugf("Could not found file %q", hash) MISSING: addMissing(f) + CONTINUE: + bar.EwmaIncrement(time.Since(start)) } + + checkingHashMux.Lock() + checkingHash = "" + checkingHashMux.Unlock() + pg.Wait() logInfof("File check finished for %s", storage.String()) return @@ -862,7 +879,10 @@ func (cr *Cluster) gcFor(s Storage) { } func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) (<-chan string, error) { - const maxRetryCount = 5 + const ( + maxRetryCount = 5 + maxTryWithOpen = 3 + ) slotId, buf, free := stats.slots.Alloc(ctx) if buf == nil { @@ -902,13 +922,14 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) ) defer bar.Abort(true) + noOpen := stats.noOpen interval := time.Second for { bar.SetCurrent(0) hashMethod, err := getHashMethod(len(f.Hash)) if err == nil { var path string - if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, stats.noOpen, func(r io.Reader) io.Reader { + if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, func(r io.Reader) io.Reader { return ProxyReader(r, bar, stats.totalBar, &stats.lastInc) }); err == nil { pathRes <- path @@ -922,9 +943,13 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) bar.SetRefill(bar.Current()) logErrorf("Download error %s:\n\t%s", f.Path, err) - if trycount.Add(1) > maxRetryCount { + c := trycount.Add(1) + if c > maxRetryCount { break } + if c > maxTryWithOpen { + noOpen = true + } select { case <-time.After(interval): interval *= 2 diff --git a/go.mod b/go.mod index e3b23ca7..a380677b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/LiterMC/go-openbmclapi go 1.21.6 require ( - github.com/LiterMC/socket.io v0.1.0 + github.com/LiterMC/socket.io v0.1.1 github.com/hamba/avro/v2 v2.18.0 github.com/klauspost/compress v1.17.4 github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/go.sum b/go.sum index 2778e92b..ce4252d5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/LiterMC/socket.io v0.1.0 h1:p3SGNJRKaTldk5Weye1EvKG92l02fLyRgRDmkcLzC7U= github.com/LiterMC/socket.io v0.1.0/go.mod h1:60lM7+qdBnP64Fk2fB6WmAS6HxI6WCdhlcvaSnutx50= +github.com/LiterMC/socket.io v0.1.1 h1:3DDMHFIG73HlUfjrH8bm1WPGHR+bEWbxQEofUr8pQeg= +github.com/LiterMC/socket.io v0.1.1/go.mod h1:60lM7+qdBnP64Fk2fB6WmAS6HxI6WCdhlcvaSnutx50= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=